Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Webhook sink is slow #84676

Closed
miretskiy opened this issue Jul 19, 2022 · 1 comment · Fixed by #99086 · May be fixed by #95369
Closed

changefeedccl: Webhook sink is slow #84676

miretskiy opened this issue Jul 19, 2022 · 1 comment · Fixed by #99086 · May be fixed by #95369
Assignees
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc

Comments

@miretskiy
Copy link
Contributor

miretskiy commented Jul 19, 2022

As documented in https://docs.google.com/document/d/1GT7r2oFtmp0Nc3UNZ1L_GFe85ozfyAG2vI7jsPrNCSQ/edit,
webhook sink is excessively slow. Not clear if some temporary fixes are possible here.
However, the combination of slow sink plus a very slow mock webhook sink implementation, make
any webhook-tests excessively slow. A single message processing takes 50ms, and some times over 1s per message,
in a local test.

Jira issue: CRDB-17793

Epic CRDB-11356

@miretskiy miretskiy added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) A-cdc Change Data Capture T-cdc labels Jul 19, 2022
@blathers-crl
Copy link

blathers-crl bot commented Jul 19, 2022

cc @cockroachdb/cdc

miretskiy pushed a commit to miretskiy/cockroach that referenced this issue Jul 19, 2022
Fixes cockroachdb#84121

cockroachdb#84007 introduced a change to add a timeout
to test feed library to prevent flaky tests from hanging for a long
time.  This timeout lead to `TestChangefeedBackfillCheckpoint` test
to become flaky.  The main contributor of the slowness of that test
was the fact that the test processes 1000 messages (twice), and
the fact that a `webhook` sink and it's mock sink implementation
are very slow (50+ms per message).

The webhook sink, and mock webhook sink performance will be
addressed separately (cockroachdb#84676)

For now, marginally improve mock webhook sink performance
by detecting when messages become available directly, instead
of relying on resolved timestamps.  Also, significantly increase
the internal test timeout when reading many messages in a unit test.

While troubleshooting this issue, observed large number of
error messages `http: TLS handshake error from 127.0.0.1:34276: EOF`.
The problem is that the webhook sink specified an arbitrary, and
very small default timeout of 3 seconds.  The default in Go
library is 0 -- no timeout; and we should have this default
as well.  Fixes 75745

Release Notes: None
craig bot pushed a commit that referenced this issue Jul 20, 2022
84572: obsservice: update README r=andreimatei a=dhartunian

update README to reference the `./dev` command
which includes the UI build.

Release note: None

84678: changefeedccl: De-flake TestChangefeedBackfillCheckpoint test. r=miretskiy a=miretskiy

Fixes #84121

#84007 introduced a change to add a timeout
to test feed library to prevent flaky tests from hanging for a long
time.  This timeout lead to `TestChangefeedBackfillCheckpoint` test
to become flaky.  The main contributor of the slowness of that test
was the fact that the test processes 1000 messages (twice), and
the fact that a `webhook` sink and it's mock sink implementation
are very slow (50+ms per message).

The webhook sink, and mock webhook sink performance will be
addressed separately (#84676)

For now, marginally improve mock webhook sink performance
by detecting when messages become available directly, instead
of relying on resolved timestamps.  Also, significantly increase
the internal test timeout when reading many messages in a unit test.

While troubleshooting this issue, observed large number of
error messages `http: TLS handshake error from 127.0.0.1:34276: EOF`.
The problem is that the webhook sink specified an arbitrary, and
very small default timeout of 3 seconds.  The default in Go
library is 0 -- no timeout; and we should have this default
as well.  Fixes #75745

Release Notes: None

84682: opt: fix crdb_internal.decode_plan_gist to work with unknown index r=rytaft a=rytaft

Release note (bug fix): `crdb_internal.decode_plan_gist` will no longer
produce an internal error when it is used to decode a plan gist for which
no schema information is available.

Co-authored-by: David Hartunian <davidh@cockroachlabs.com>
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: Rebecca Taft <becca@cockroachlabs.com>
samiskin added a commit to samiskin/cockroach that referenced this issue Mar 22, 2023
Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchWriter() BatchWriter
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchWriter interface {
        AppendKV(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the writer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventPool          sync.Pool
        batchPool          sync.Pool
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchWriter`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan IORequest
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
	SetError(error)
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

Requests are received as IORequests and the response is also returned as
an IORequest.  This way the parallelIO struct does not have to do any
heap allocations to communicate, the user of it can manage creating
and freeing these objects in pools.  The only heap allocations that
occur are part of the `intset` operations as it uses a linkedlist
internally.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
samiskin added a commit to samiskin/cockroach that referenced this issue Mar 22, 2023
Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchWriter() BatchWriter
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchWriter interface {
        AppendKV(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the writer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventPool          sync.Pool
        batchPool          sync.Pool
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchWriter`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan IORequest
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
	SetError(error)
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

Requests are received as IORequests and the response is also returned as
an IORequest.  This way the parallelIO struct does not have to do any
heap allocations to communicate, the user of it can manage creating
and freeing these objects in pools.  The only heap allocations that
occur are part of the `intset` operations as it uses a linkedlist
internally.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
samiskin added a commit to samiskin/cockroach that referenced this issue Mar 23, 2023
Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchWriter() BatchWriter
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchWriter interface {
        AppendKV(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the writer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventPool          sync.Pool
        batchPool          sync.Pool
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchWriter`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan IORequest
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
	SetError(error)
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

Requests are received as IORequests and the response is also returned as
an IORequest.  This way the parallelIO struct does not have to do any
heap allocations to communicate, the user of it can manage creating
and freeing these objects in pools.  The only heap allocations that
occur are part of the `intset` operations as it uses a linkedlist
internally.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
samiskin added a commit to samiskin/cockroach that referenced this issue Mar 28, 2023
Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchBuffer() BatchBuffer
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchBuffer interface {
        Append(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchBuffer`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan *ioResult
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
}

type ioResult struct {
	request IORequest
	err     error
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

The request and its error (if one occured despite the retries) are returned on
resultCh.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
samiskin added a commit to samiskin/cockroach that referenced this issue Mar 30, 2023
Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchBuffer() BatchBuffer
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchBuffer interface {
        Append(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchBuffer`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan *ioResult
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
}

type ioResult struct {
	request IORequest
	err     error
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

The request and its error (if one occured despite the retries) are returned on
resultCh.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
craig bot pushed a commit that referenced this issue Mar 31, 2023
99086: changefeedccl: webhook sink rewrite r=samiskin a=samiskin

Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general `batchingSink` framework that can be used to make adding new sinks an easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchWriter() BatchWriter
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchWriter interface {
        AppendKV(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the writer can be `Close()`'d to do any final formatting (ex: wrap in a json object with extra metadata) of the buffer-able data and obtain a final `SinkPayload` that is ready to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the sink endpoint, and may be called multiple times with the same payload due to retries.  Any kind of formatting work should be served to run in the buffer's `Close` and stored as a `SinkPayload` to avoid multiple calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventPool          sync.Pool
        batchPool          sync.Pool
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchWriter`s to eventually form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured `minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is not required for users of this sink.

Events sent through the goroutines would normally need to exist on the heap, but to avoid excessive garbage collection of hundreds of thousands of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the EmitRow caller to the batching worker) and the `sinkBatchBuffer{<data about the batch>}` events (sent from the batching worker to the IO routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the above and just one-by-one flushing the batch payloads on a separate routine is plenty good enough.  Unfortunately the Webhook sink can be used with no batching at all with users wanting the lowest latency while still having good throughput.  This means we need to be able to have multiple requests in flight.  The difficulty here is if a batch with keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block until [a1,b1] completes as b2 cannot be sent and risk arriving at the destination prior to b1.

Flushing out Payloads in a way that is both able to maintain key-ordering guarantees but is able to run in parallel is done by a separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan IORequest
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
	SetError(error)
}
```

It involves one goroutine to manage the key ordering guarantees and a configurable number of IO Worker goroutines that simply call `ioHandler` on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a `intsets.Fast` struct, which allows for efficient Union/Intersects/Difference operations on them that `parallelIO` needs to maintain ordering guarantees.

Requests are received as IORequests and the response is also returned as an IORequest.  This way the parallelIO struct does not have to do any heap allocations to communicate, the user of it can manage creating and freeing these objects in pools.  The only heap allocations that occur are part of the `intset` operations as it uses a linkedlist internally.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker 
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most of the `batchingSink`/`parallelIO` work didn't really show up much, the work was largely just step 3, where taking a list of messages and calling `json.Marshal` on it took almost 10% of the time, specifically a call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of already-formatted JSON messages into a surrounding JSON array and small object, I also swapped `json.Marshal` to just stitch together characters manually into a buffer.

---

In the following flamegraph of a node at around 35% CPU usage, only 5.56% of the total cputime in the graph (three small chunks between the parallelEventConsumer chunk and the kvevent chunk) is taken up by the paralelIO and batchingSink workers.  This is with batch size of 100. 
 
<img width="1876" alt="Screenshot 2023-03-20 at 10 48 33 PM" src="https://user-images.githubusercontent.com/6236424/226507074-00d293ac-e929-478c-9a4b-24b40e0f7bab.png">

The max CPU usage here was around 37% with a max throughput of 135k for a single node (the other nodes had run out of data at this point).  Since the majority of the flamegraph shows time spent in the event processing code I'm going to assume this will be handled by the pacer and won't be much of an issue.

In the above flamegraph `runtime.gcDrain` does show up using 10.55% cpu but when trying the cloudstorage sink it had around the same value.  I'm guessing this means there isn't an extra gc thrashing issue.  I believe the only non-pool allocations that occur are the intsets.

---

Since Matt's talked about a new significance being placed on Feature flagging new work to avoid need for technical advisories, I placed this new implementation under the changefeed.new_webhook_sink_enabled setting and defaulted it to be disabled.

Right now its `sink_webhook_v2` just to keep `sink_webhook.go` unchanged so that this review is easier to do.  I may move `sink_webhook` to `deprecated_sink_webhook` and `sink_webhook_v2` to just be `sink_webhook` prior to merging.

---

Release note (performance improvement): the webhook sink is now able to handle a drastically higher maximum throughput by enabling the "changefeed.new_webhook_sink_enabled" cluster setting.





Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
@craig craig bot closed this as completed in 8cb7444 Mar 31, 2023
samiskin added a commit to samiskin/cockroach that referenced this issue Apr 4, 2023
Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchBuffer() BatchBuffer
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchBuffer interface {
        Append(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchBuffer`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan *ioResult
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
}

type ioResult struct {
	request IORequest
	err     error
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

The request and its error (if one occured despite the retries) are returned on
resultCh.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
blathers-crl bot pushed a commit that referenced this issue Apr 4, 2023
Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchBuffer() BatchBuffer
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchBuffer interface {
        Append(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchBuffer`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan *ioResult
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
}

type ioResult struct {
	request IORequest
	err     error
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

The request and its error (if one occured despite the retries) are returned on
resultCh.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-cdc Change Data Capture C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc
Projects
None yet
2 participants