From 5866b210ff81534c605de1ed3f42e7950caebc5b Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Tue, 21 Mar 2023 16:51:37 +0000 Subject: [PATCH] changefeedccl: webhook sink refactor Resolves #84676 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356 This PR implements the Webhook sink as part of a more general `batchingSink` framework that can be used to make adding new sinks an easier process, making it far more performant than it was previously. A followup PR will be made to use the `batchingSink` for the pubsub client which also suffers performance issues. --- Sink-specific code is encapsulated in a SinkClient interface ```go type SinkClient interface { MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) MakeBatchBuffer() BatchBuffer Flush(context.Context, SinkPayload) error Close() error } type BatchBuffer interface { Append(key []byte, value []byte, topic string) ShouldFlush() bool Close() (SinkPayload, error) } type SinkPayload interface{} ``` Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to do any final formatting (ex: wrap in a json object with extra metadata) of the buffer-able data and obtain a final `SinkPayload` that is ready to be passed to `SinkClient.Flush`. The `SinkClient` has a separate `MakeResolvedPayload` since the sink may require resolved events be formatted differently to a batch of kvs. `Flush(ctx, payload)` encapsulates sending a blocking IO request to the sink endpoint, and may be called multiple times with the same payload due to retries. Any kind of formatting work should be served to run in the buffer's `Close` and stored as a `SinkPayload` to avoid multiple calls to `Flush` repeating work upon retries. --- The `batchingSink` handles all the logic to take a SinkClient and form a full Sink implementation. ```go type batchingSink struct { client SinkClient ioWorkers int minFlushFrequency time.Duration retryOpts retry.Options eventCh chan interface{} pacer *admission.Pacer ... } var _ Sink = (*batchingSink)(nil) ``` It involves a single goroutine which handles: - Creating, building up, and finalizing `BatchBuffer`s to eventually form a `SinkPayload` to emit - Flushing batches when they have persisted longer than a configured `minFlushFrequency` - Flushing deliberately and being able to block until the Flush has completed - Logging all the various sink metrics `EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is not required for users of this sink. Events sent through the goroutines would normally need to exist on the heap, but to avoid excessive garbage collection of hundreds of thousands of tiny structs, both the `kvEvents{}` events (sent from the EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{}` events (sent from the batching worker to the IO routine in the next section) are allocated on object pools. --- For a sink like Cloudstorage where there are large batches, doing the above and just one-by-one flushing the batch payloads on a separate routine is plenty good enough. Unfortunately the Webhook sink can be used with no batching at all with users wanting the lowest latency while still having good throughput. This means we need to be able to have multiple requests in flight. The difficulty here is if a batch with keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block until [a1,b1] completes as b2 cannot be sent and risk arriving at the destination prior to b1. Flushing out Payloads in a way that is both able to maintain key-ordering guarantees but is able to run in parallel is done by a separate `parallel_io` struct. ```go type parallelIO struct { retryOpts retry.Options ioHandler IOHandler requestCh chan IORequest resultCh chan *ioResult ... } type IOHandler func(context.Context, IORequest) error type IORequest interface { Keys() intsets.Fast } type ioResult struct { request IORequest err error } ``` It involves one goroutine to manage the key ordering guarantees and a configurable number of IO Worker goroutines that simply call `ioHandler` on an `IORequest`. IORequests represent the keys they shouldn't conflict on by providing a `intsets.Fast` struct, which allows for efficient Union/Intersects/Difference operations on them that `parallelIO` needs to maintain ordering guarantees. The request and its error (if one occured despite the retries) are returned on resultCh. --- The webhook sink is therefore formed by: 1. EmitRow is called, creating kvEvents that are sent to a Batching worker 2. The batching worker takes events and appends them to a batch 3. Once the batch is full, its encoded into an HTTP request 4. The request object is then sharded across a set of IO workers to be fully sent out in parallel with other non-key-conflicting requests. With this setup, looking at the CPU flamegraph, at high throughputs most of the `batchingSink`/`parallelIO` work didn't really show up much, the work was largely just step 3, where taking a list of messages and calling `json.Marshal` on it took almost 10% of the time, specifically a call to `json.Compress`. Since this isn't needed, and all we're doing is simply putting a list of already-formatted JSON messages into a surrounding JSON array and small object, I also swapped `json.Marshal` to just stitch together characters manually into a buffer. --- Since Matt's talked about a new significance being placed on Feature flagging new work to avoid need for technical advisories, I placed this new implementation under the changefeed.new_webhook_sink_enabled setting and defaulted it to be disabled. --- Release note (performance improvement): the webhook sink is now able to handle a drastically higher maximum throughput by enabling the "changefeed.new_webhook_sink_enabled" cluster setting. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 4 + pkg/ccl/changefeedccl/batching_sink.go | 484 ++++++++++++++++++ .../changefeedccl/changefeed_processors.go | 11 +- pkg/ccl/changefeedccl/changefeed_test.go | 12 +- .../changefeedccl/changefeedbase/settings.go | 28 +- pkg/ccl/changefeedccl/event_processing.go | 8 +- pkg/ccl/changefeedccl/helpers_test.go | 11 +- pkg/ccl/changefeedccl/metrics.go | 51 ++ pkg/ccl/changefeedccl/parallel_io.go | 282 ++++++++++ pkg/ccl/changefeedccl/sink.go | 202 +++++++- pkg/ccl/changefeedccl/sink_test.go | 83 +++ pkg/ccl/changefeedccl/sink_webhook.go | 131 ++--- pkg/ccl/changefeedccl/sink_webhook_test.go | 26 +- pkg/ccl/changefeedccl/sink_webhook_v2.go | 380 ++++++++++++++ pkg/ccl/changefeedccl/telemetry.go | 10 + pkg/cmd/roachtest/tests/cdc.go | 1 + 18 files changed, 1593 insertions(+), 133 deletions(-) create mode 100644 pkg/ccl/changefeedccl/batching_sink.go create mode 100644 pkg/ccl/changefeedccl/parallel_io.go create mode 100644 pkg/ccl/changefeedccl/sink_webhook_v2.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index a40331d84d4b..6501f5f7c87e 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,6 +17,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr changefeed.fast_gzip.enabled boolean true use fast gzip implementation changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables +changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeout duration 10m0s the timeout for import/export storage operations diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 01de5764264d..f65e5d4ce5de 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -23,6 +23,7 @@
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementation
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after
duration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disables +
changefeed.sink_io_workers
integer0the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca
stringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout
duration10m0sthe timeout for import/export storage operations diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index b93b62ac8cbf..7972fb968fb3 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "alter_changefeed_stmt.go", "authorization.go", "avro.go", + "batching_sink.go", "changefeed.go", "changefeed_dist.go", "changefeed_processors.go", @@ -20,6 +21,7 @@ go_library( "event_processing.go", "metrics.go", "name.go", + "parallel_io.go", "parquet_sink_cloudstorage.go", "retry.go", "scheduled_changefeed.go", @@ -32,6 +34,7 @@ go_library( "sink_pubsub.go", "sink_sql.go", "sink_webhook.go", + "sink_webhook_v2.go", "telemetry.go", "testing_knobs.go", "tls.go", @@ -125,6 +128,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/httputil", "//pkg/util/humanizeutil", + "//pkg/util/intsets", "//pkg/util/json", "//pkg/util/log", "//pkg/util/log/eventpb", diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go new file mode 100644 index 000000000000..0450222581c9 --- /dev/null +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -0,0 +1,484 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "fmt" + "hash" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// SinkClient is an interface to an external sink, where messages are written +// into batches as they arrive and once ready are flushed out. +type SinkClient interface { + MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) + MakeBatchBuffer() BatchBuffer + Flush(context.Context, SinkPayload) error + Close() error +} + +// BatchBuffer is an interface to aggregate KVs into a payload that can be sent +// to the sink. +type BatchBuffer interface { + Append(key []byte, value []byte, topic string) + ShouldFlush() bool + + // Once all data has been Append'ed, Close can be called to return a finalized + // Payload that is valid for a pure IO operation via Flush. All final encoding + // work (ex: wrapping an array in an object with metadata) should be done in + // Close, as non-io-related work done in Flush would be unnecessarily repeated + // upon retries. + Close() (SinkPayload, error) +} + +// SinkPayload is an interface representing a sink-specific representation of a +// batch of messages that is ready to be emitted by its Flush method. +type SinkPayload interface{} + +// batchingSink wraps a SinkClient to provide a Sink implementation that calls +// the SinkClient methods to form batches and flushes those batches across +// multiple parallel IO workers. +type batchingSink struct { + client SinkClient + topicNamer *TopicNamer + concreteType sinkType + + ioWorkers int + minFlushFrequency time.Duration + retryOpts retry.Options + + ts timeutil.TimeSource + metrics metricsRecorder + knobs batchingSinkKnobs + + // eventCh is the channel used to send requests from the Sink caller routines + // to the batching routine. Messages can either be a flushReq or a rowEvent. + eventCh chan interface{} + + pacer *admission.Pacer + pacerFactory func() *admission.Pacer + + termErr error + wg ctxgroup.Group + hasher hash.Hash32 + serverCfg *execinfra.ServerConfig + doneCh chan struct{} +} + +type batchingSinkKnobs struct { + OnAppend func(*rowEvent) +} + +type flushReq struct { + waiter chan struct{} +} + +type rowEvent struct { + key []byte + val []byte + topic string + + alloc kvevent.Alloc + mvcc hlc.Timestamp +} + +// Flush implements the Sink interface, returning the first error that has +// occured in the past EmitRow calls. +func (s *batchingSink) Flush(ctx context.Context) error { + defer s.metrics.recordFlushRequestCallback()() + flushWaiter := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + case s.eventCh <- flushReq{waiter: flushWaiter}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + return nil + case <-flushWaiter: + if s.termErr != nil { + return s.termErr + } + } + + // Refresh the pacer in case any settings have changed. s.pacer can safely be + // assigned since once the Flush has completed waiting, no new messages exist + // to be processed so pacer.Pace won't be called by the batching worker. + s.pacer = s.pacerFactory() + + return nil +} + +var _ Sink = (*batchingSink)(nil) + +// Event structs and batch structs which are transferred across routines (and +// therefore escape to the heap) can both be incredibly frequent (every event +// may be its own batch) and temporary, so to avoid GC thrashing they are both +// claimed and freed from object pools. +var eventPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(rowEvent) + }, +} + +func newRowEvent() *rowEvent { + return eventPool.Get().(*rowEvent) +} +func freeRowEvent(e *rowEvent) { + *e = rowEvent{} + eventPool.Put(e) +} + +var batchPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(sinkBatch) + }, +} + +func newSinkBatch() *sinkBatch { + return batchPool.Get().(*sinkBatch) +} +func freeSinkBatchEvent(b *sinkBatch) { + *b = sinkBatch{} + batchPool.Put(b) +} + +// EmitRow implements the Sink interface. +func (s *batchingSink) EmitRow( + ctx context.Context, + topic TopicDescriptor, + key, value []byte, + updated, mvcc hlc.Timestamp, + alloc kvevent.Alloc, +) error { + s.metrics.recordMessageSize(int64(len(key) + len(value))) + + payload := newRowEvent() + payload.key = key + payload.val = value + payload.topic = "" // unimplemented for now + payload.mvcc = mvcc + payload.alloc = alloc + + select { + case <-ctx.Done(): + return ctx.Err() + case s.eventCh <- payload: + case <-s.doneCh: + } + + return nil +} + +// EmitResolvedTimestamp implements the Sink interface. +func (s *batchingSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + data, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved) + if err != nil { + return err + } + payload, err := s.client.MakeResolvedPayload(data, "") + if err != nil { + return err + } + + if err = s.Flush(ctx); err != nil { + return err + } + return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error { + return s.client.Flush(ctx, payload) + }) +} + +// Close implements the Sink interface. +func (s *batchingSink) Close() error { + close(s.doneCh) + _ = s.wg.Wait() + s.pacer.Close() + return s.client.Close() +} + +// Dial implements the Sink interface. +func (s *batchingSink) Dial() error { + return nil +} + +// getConcreteType implements the Sink interface. +func (s *batchingSink) getConcreteType() sinkType { + return s.concreteType +} + +// sinkBatch stores an in-progress/complete batch of messages, along with +// metadata related to the batch. +type sinkBatch struct { + buffer BatchBuffer + payload SinkPayload // payload is nil until FinalizePayload has been called + + numMessages int + numKVBytes int // the total amount of uncompressed kv data in the batch + keys intsets.Fast // the set of keys within the batch to provide to parallelIO + bufferTime time.Time // the earliest time a message was inserted into the batch + mvcc hlc.Timestamp + + alloc kvevent.Alloc + hasher hash.Hash32 +} + +// FinalizePayload closes the writer to produce a payload that is ready to be +// Flushed by the SinkClient. +func (sb *sinkBatch) FinalizePayload() error { + payload, err := sb.buffer.Close() + if err != nil { + return err + } + sb.payload = payload + return nil +} + +// Keys implements the IORequest interface. +func (sb *sinkBatch) Keys() intsets.Fast { + return sb.keys +} + +func (sb *sinkBatch) isEmpty() bool { + return sb.numMessages == 0 +} + +func hashToInt(h hash.Hash32, buf []byte) int { + h.Reset() + h.Write(buf) + return int(h.Sum32()) +} + +// Append adds the contents of a kvEvent to the batch, merging its alloc pool. +func (sb *sinkBatch) Append(e *rowEvent) { + if sb.isEmpty() { + sb.bufferTime = timeutil.Now() + } + + sb.buffer.Append(e.key, e.val, e.topic) + + sb.keys.Add(hashToInt(sb.hasher, e.key)) + sb.numMessages += 1 + sb.numKVBytes += len(e.key) + len(e.val) + + if sb.mvcc.IsEmpty() || e.mvcc.Less(sb.mvcc) { + sb.mvcc = e.mvcc + } + + sb.alloc.Merge(&e.alloc) +} + +func (s *batchingSink) handleError(err error) { + if s.termErr == nil { + s.termErr = err + } +} + +func (s *batchingSink) newBatchBuffer() *sinkBatch { + batch := newSinkBatch() + batch.buffer = s.client.MakeBatchBuffer() + batch.hasher = s.hasher + return batch +} + +// runBatchingWorker combines 1 or more row events into batches, sending the IO +// requests out either once the batch is full or a flush request arrives. +func (s *batchingSink) runBatchingWorker(ctx context.Context) { + batchBuffer := s.newBatchBuffer() + + // Once finalized, batches are sent to a parallelIO struct which handles + // performing multiple Flushes in parallel while maintaining Keys() ordering. + ioHandler := func(ctx context.Context, req IORequest) error { + batch, _ := req.(*sinkBatch) + defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages)) + s.metrics.recordSinkIOInflightChange(int64(batch.numMessages)) + return s.client.Flush(ctx, batch.payload) + } + ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics) + defer ioEmitter.Close() + + // Flushing requires tracking the number of inflight messages and confirming + // completion to the requester once the counter reaches 0. + inflight := 0 + var sinkFlushWaiter chan struct{} + + handleResult := func(result *ioResult) { + batch, _ := result.request.(*sinkBatch) + + if result.err != nil { + s.handleError(result.err) + } else { + s.metrics.recordEmittedBatch( + batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, + ) + } + + inflight -= batch.numMessages + + if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil { + close(sinkFlushWaiter) + sinkFlushWaiter = nil + } + + freeIOResult(result) + batch.alloc.Release(ctx) + freeSinkBatchEvent(batch) + } + + tryFlushBatch := func() error { + if batchBuffer.isEmpty() { + return nil + } + toFlush := batchBuffer + batchBuffer = s.newBatchBuffer() + + if err := toFlush.FinalizePayload(); err != nil { + return err + } + + // Emitting needs to also handle any incoming results to avoid a deadlock + // with trying to emit while the emitter is blocked on returning a result. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ioEmitter.requestCh <- toFlush: + case result := <-ioEmitter.resultCh: + handleResult(result) + continue + case <-s.doneCh: + } + break + } + + return nil + } + + flushTimer := s.ts.NewTimer() + defer flushTimer.Stop() + + for { + select { + case req := <-s.eventCh: + if err := s.pacer.Pace(ctx); err != nil { + if pacerLogEvery.ShouldLog() { + log.Errorf(ctx, "automatic sink batcher pacing: %v", err) + } + } + + switch r := req.(type) { + case *rowEvent: + if s.termErr != nil { + continue + } + + inflight += 1 + + // If we're about to append to an empty batch, start the timer to + // guarantee the messages do not stay buffered longer than the + // configured frequency. + if batchBuffer.isEmpty() && s.minFlushFrequency > 0 { + flushTimer.Reset(s.minFlushFrequency) + } + + batchBuffer.Append(r) + if s.knobs.OnAppend != nil { + s.knobs.OnAppend(r) + } + + // The event struct can be freed as the contents are expected to be + // managed by the batch instead. + freeRowEvent(r) + + if batchBuffer.buffer.ShouldFlush() { + s.metrics.recordSizeBasedFlush() + if err := tryFlushBatch(); err != nil { + s.handleError(err) + } + } + case flushReq: + if inflight == 0 || s.termErr != nil { + close(r.waiter) + } else { + sinkFlushWaiter = r.waiter + if err := tryFlushBatch(); err != nil { + s.handleError(err) + } + } + default: + s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r)) + } + case result := <-ioEmitter.resultCh: + handleResult(result) + case <-flushTimer.Ch(): + flushTimer.MarkRead() + if err := tryFlushBatch(); err != nil { + s.handleError(err) + } + case <-ctx.Done(): + return + case <-s.doneCh: + return + } + } +} + +func makeBatchingSink( + ctx context.Context, + concreteType sinkType, + client SinkClient, + minFlushFrequency time.Duration, + retryOpts retry.Options, + numWorkers int, + topicNamer *TopicNamer, + pacerFactory func() *admission.Pacer, + timeSource timeutil.TimeSource, + metrics metricsRecorder, +) Sink { + sink := &batchingSink{ + client: client, + topicNamer: topicNamer, + concreteType: concreteType, + minFlushFrequency: minFlushFrequency, + ioWorkers: numWorkers, + retryOpts: retryOpts, + ts: timeSource, + metrics: metrics, + eventCh: make(chan interface{}, flushQueueDepth), + wg: ctxgroup.WithContext(ctx), + hasher: makeHasher(), + pacerFactory: pacerFactory, + pacer: pacerFactory(), + doneCh: make(chan struct{}), + } + + sink.wg.GoCtx(func(ctx context.Context) error { + sink.runBatchingWorker(ctx) + return nil + }) + return sink +} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index fde56353bfa3..76032968ded5 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -564,12 +564,19 @@ func (ca *changeAggregator) tick() error { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx()) + return ca.flushBufferedEvents() } return nil } +func (ca *changeAggregator) flushBufferedEvents() error { + if err := ca.eventConsumer.Flush(ca.Ctx()); err != nil { + return err + } + return ca.sink.Flush(ca.Ctx()) +} + // noteResolvedSpan periodically flushes Frontier progress from the current // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress @@ -621,7 +628,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx()); err != nil { + if err := ca.flushBufferedEvents(); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index a75f6635f598..ea1414a48dac 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4877,17 +4877,17 @@ func TestChangefeedErrors(t *testing.T) { `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `invalid option value webhook_sink_config, all config values must be non-negative`, + t, `invalid sink config, all values must be non-negative`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": -100, "Frequency": "1s"}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `invalid option value webhook_sink_config, all config values must be non-negative`, + t, `invalid sink config, all values must be non-negative`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": 100, "Frequency": "-1s"}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `invalid option value webhook_sink_config, flush frequency is not set, messages may never be sent`, + t, `invalid sink config, Flush.Frequency is not set, messages may never be sent`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": 100}}'`, `webhook-https://fake-host`, ) @@ -4907,17 +4907,17 @@ func TestChangefeedErrors(t *testing.T) { `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `max retries must be either a positive int or 'inf' for infinite retries.`, + t, `Retry.Max must be either a positive int or 'inf' for infinite retries.`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": "not valid"}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `max retry count must be a positive integer. use 'inf' for infinite retries.`, + t, `Retry.Max must be a positive integer. use 'inf' for infinite retries.`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": 0}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `max retry count must be a positive integer. use 'inf' for infinite retries.`, + t, `Retry.Max must be a positive integer. use 'inf' for infinite retries.`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": -1}}'`, `webhook-https://fake-host`, ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index abb8292b3867..9bd743c06ee2 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -262,9 +262,9 @@ var EventConsumerPacerRequestSize = settings.RegisterDurationSetting( settings.PositiveDuration, ) -// EventConsumerElasticCPUControlEnabled determines whether changefeed event +// PerEventElasticCPUControlEnabled determines whether changefeed event // processing integrates with elastic CPU control. -var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( +var PerEventElasticCPUControlEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "changefeed.cpu.per_event_elastic_control.enabled", "determines whether changefeed event processing integrates with elastic CPU control", @@ -281,3 +281,27 @@ var RequireExternalConnectionSink = settings.RegisterBoolSetting( " see https://www.cockroachlabs.com/docs/stable/create-external-connection.html", false, ) + +// SinkIOWorkers controls the number of IO workers used by sinks that use +// parallelIO to be able to send multiple requests in parallel. +var SinkIOWorkers = settings.RegisterIntSetting( + settings.TenantWritable, + "changefeed.sink_io_workers", + "the number of workers used by changefeeds when sending requests to the sink "+ + "(currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.", + 0, +).WithPublic() + +// SinkPacerRequestSize specifies how often (measured in CPU time) +// that the Sink batching worker request CPU time from admission control. For +// example, every N milliseconds of CPU work, request N more milliseconds of CPU +// time. +var SinkPacerRequestSize = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.cpu.sink_encoding_allocation", + "an event consumer worker will perform a blocking request for CPU time "+ + "before consuming events. after fully utilizing this CPU time, it will "+ + "request more", + 50*time.Millisecond, + settings.PositiveDuration, +) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 2861515fe03e..7c44cf17d815 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -104,7 +104,7 @@ func newEventConsumer( } pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV) - enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV) makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) { var err error @@ -182,7 +182,11 @@ func newEventConsumer( workerChSize: changefeedbase.EventConsumerWorkerQueueSize.Get(&cfg.Settings.SV), spanFrontier: spanFrontier, } - ss := &safeSink{wrapped: sink, beforeFlush: c.Flush} + ss := sink + // Only webhook supports concurrent EmitRow calls + if sink.getConcreteType() != sinkTypeWebhook { + ss = &safeSink{wrapped: sink} + } c.makeConsumer = func() (eventConsumer, error) { return makeConsumer(ss, c) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 1752392f5136..2e1f5a4f0483 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -854,6 +854,12 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { for _, weight := range sinkWeights { weightTotal += weight } + if weightTotal == 0 { + // This exists for testing purposes, where one may want to run all tests on + // the same sink and set sinkWeights to be 1 only for that sink, but some + // tests explicitly disallow that sink and therefore have no valid sinks. + return "skip" + } p := rand.Float32() * float32(weightTotal) var sum float32 = 0 for sink, weight := range sinkWeights { @@ -862,7 +868,7 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { return sink } } - return "kafka" // unreachable + return "skip" // unreachable } // addCloudStorageOptions adds the options necessary to enable a server to run a @@ -982,6 +988,9 @@ func cdcTestNamedWithSystem( cleanupCloudStorage := addCloudStorageOptions(t, &options) sinkType := randomSinkTypeWithOptions(options) + if sinkType == "skip" { + return + } testLabel := sinkType if name != "" { testLabel = fmt.Sprintf("%s/%s", sinkType, name) diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index cfe92a54a13a..4d9539ebe213 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -32,6 +32,7 @@ const ( changefeedCheckpointHistMaxLatency = 30 * time.Second changefeedBatchHistMaxLatency = 30 * time.Second changefeedFlushHistMaxLatency = 1 * time.Minute + changefeedIOQueueMaxLatency = 5 * time.Minute admitLatencyMaxValue = 1 * time.Minute commitLatencyMaxValue = 10 * time.Minute ) @@ -55,6 +56,8 @@ type AggMetrics struct { Flushes *aggmetric.AggCounter FlushHistNanos *aggmetric.AggHistogram SizeBasedFlushes *aggmetric.AggCounter + ParallelIOQueueNanos *aggmetric.AggHistogram + SinkIOInflight *aggmetric.AggGauge CommitLatency *aggmetric.AggHistogram BackfillCount *aggmetric.AggGauge BackfillPendingRanges *aggmetric.AggGauge @@ -92,6 +95,8 @@ type metricsRecorder interface { getBackfillCallback() func() func() getBackfillRangeCallback() func(int64) (func(), func()) recordSizeBasedFlush() + recordParallelIOQueueLatency(time.Duration) + recordSinkIOInflightChange(int64) } var _ metricsRecorder = (*sliMetrics)(nil) @@ -111,6 +116,8 @@ type sliMetrics struct { Flushes *aggmetric.Counter FlushHistNanos *aggmetric.Histogram SizeBasedFlushes *aggmetric.Counter + ParallelIOQueueNanos *aggmetric.Histogram + SinkIOInflight *aggmetric.Gauge CommitLatency *aggmetric.Histogram ErrorRetries *aggmetric.Counter AdmitLatency *aggmetric.Histogram @@ -244,6 +251,20 @@ func (m *sliMetrics) recordSizeBasedFlush() { m.SizeBasedFlushes.Inc(1) } +func (m *sliMetrics) recordParallelIOQueueLatency(latency time.Duration) { + if m == nil { + return + } + m.ParallelIOQueueNanos.RecordValue(latency.Nanoseconds()) +} +func (m *sliMetrics) recordSinkIOInflightChange(delta int64) { + if m == nil { + return + } + + m.SinkIOInflight.Inc(delta) +} + type wrappingCostController struct { ctx context.Context inner metricsRecorder @@ -314,6 +335,14 @@ func (w *wrappingCostController) recordSizeBasedFlush() { w.inner.recordSizeBasedFlush() } +func (w *wrappingCostController) recordParallelIOQueueLatency(latency time.Duration) { + w.inner.recordParallelIOQueueLatency(latency) +} + +func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) { + w.inner.recordSinkIOInflightChange(delta) +} + var ( metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", @@ -510,6 +539,18 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Registrations", Unit: metric.Unit_COUNT, } + metaChangefeedParallelIOQueueNanos := metric.Metadata{ + Name: "changefeed.parallel_io_queue_nanos", + Help: "Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedSinkIOInflight := metric.Metadata{ + Name: "changefeed.sink_io_inflight", + Help: "The number of keys currently inflight as IO requests being sent to the sink", + Measurement: "Messages", + Unit: metric.Unit_COUNT, + } // NB: When adding new histograms, use sigFigs = 1. Older histograms // retain significant figures of 2. b := aggmetric.MakeBuilder("scope") @@ -528,6 +569,14 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { FlushedBytes: b.Counter(metaChangefeedFlushedBytes), Flushes: b.Counter(metaChangefeedFlushes), SizeBasedFlushes: b.Counter(metaSizeBasedFlushes), + ParallelIOQueueNanos: b.Histogram(metric.HistogramOptions{ + Metadata: metaChangefeedParallelIOQueueNanos, + Duration: histogramWindow, + MaxVal: changefeedIOQueueMaxLatency.Nanoseconds(), + SigFigs: 2, + Buckets: metric.BatchProcessLatencyBuckets, + }), + SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight), BatchHistNanos: b.Histogram(metric.HistogramOptions{ Metadata: metaChangefeedBatchHistNanos, @@ -611,6 +660,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { Flushes: a.Flushes.AddChild(scope), FlushHistNanos: a.FlushHistNanos.AddChild(scope), SizeBasedFlushes: a.SizeBasedFlushes.AddChild(scope), + ParallelIOQueueNanos: a.ParallelIOQueueNanos.AddChild(scope), + SinkIOInflight: a.SinkIOInflight.AddChild(scope), CommitLatency: a.CommitLatency.AddChild(scope), ErrorRetries: a.ErrorRetries.AddChild(scope), AdmitLatency: a.AdmitLatency.AddChild(scope), diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go new file mode 100644 index 000000000000..8a2e00705352 --- /dev/null +++ b/pkg/ccl/changefeedccl/parallel_io.go @@ -0,0 +1,282 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// parallelIO allows performing blocking "IOHandler" calls on in parallel. +// IORequests implement a Keys() function returning keys on which ordering is +// preserved. +// Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that +// order, [a,b] and [e,f] can be emitted concurrentyl while [b,c] will block +// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d] +// errored, [b,c] would never be sent, and an error would be returned with [c,d] +// in an ioResult struct sent to resultCh. After sending an error to resultCh +// all workers are torn down and no further requests are received or handled. +type parallelIO struct { + retryOpts retry.Options + wg ctxgroup.Group + metrics metricsRecorder + doneCh chan struct{} + + ioHandler IOHandler + + requestCh chan IORequest + resultCh chan *ioResult // readers should freeIOResult after handling result events +} + +// IORequest represents an abstract unit of IO that has a set of keys upon which +// sequential ordering of fulfillment must be enforced. +type IORequest interface { + Keys() intsets.Fast +} + +// ioResult stores the full request that was sent as well as an error if even +// after retries the IOHanlder was unable to succeed. +type ioResult struct { + request IORequest + err error +} + +var resultPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(ioResult) + }, +} + +func newIOResult(req IORequest, err error) *ioResult { + res := resultPool.Get().(*ioResult) + res.request = req + res.err = err + return res +} +func freeIOResult(e *ioResult) { + *e = ioResult{} + resultPool.Put(e) +} + +type queuedRequest struct { + req IORequest + admitTime time.Time +} + +// IOHandler performs a blocking IO operation on an IORequest +type IOHandler func(context.Context, IORequest) error + +func newParallelIO( + ctx context.Context, + retryOpts retry.Options, + numWorkers int, + handler IOHandler, + metrics metricsRecorder, +) *parallelIO { + wg := ctxgroup.WithContext(ctx) + io := ¶llelIO{ + retryOpts: retryOpts, + wg: wg, + metrics: metrics, + ioHandler: handler, + requestCh: make(chan IORequest, numWorkers), + resultCh: make(chan *ioResult, numWorkers), + doneCh: make(chan struct{}), + } + + wg.GoCtx(func(ctx context.Context) error { + return io.processIO(ctx, numWorkers) + }) + + return io +} + +// Close stops all workers immediately and returns once they shut down. Inflight +// requests sent to requestCh may never result in being sent to resultCh. +func (p *parallelIO) Close() { + close(p.doneCh) + _ = p.wg.Wait() +} + +// processIO starts numEmitWorkers worker threads to run the IOHandler on +// non-conflicting IORequests each retrying according to the retryOpts, then: +// - Reads incoming messages from requestCh, sending them to any worker if there +// aren't any conflicting messages and queing them up to be sent later +// otherwise. +// - Reads results from the workers and forwards the information to resultCh, at +// this point also sending the first pending request that would now be sendable. +// +// ┌───────────┐ +// ┌►│io worker 1├──┐ +// │ └───────────┘ │ +// <-requestCh──────────┤► ... ├─┬─► resultCh<- +// │ ▲ │ ┌───────────┐ │ │ +// │conflict │ └►│io worker n├──┘ │ +// │ │ └───────────┘ │ +// ▼ │ │check pending +// ┌─────────┐ │no conflict │ +// │ pending ├───────┘ │ +// └─────────┘ │ +// ▲ │ +// └──────────────────────────────────┘ +// +// The conflict checking is done via an intset.Fast storing the union of all +// keys currently being sent, followed by checking each pending batch's intset. +func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { + emitWithRetries := func(ctx context.Context, payload IORequest) error { + initialSend := true + return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error { + if !initialSend { + p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false) + } + initialSend = false + return p.ioHandler(ctx, payload) + }) + } + + // Multiple worker routines handle the IO operations, retrying when necessary. + workerEmitCh := make(chan IORequest, numEmitWorkers) + defer close(workerEmitCh) + workerResultCh := make(chan *ioResult, numEmitWorkers) + + for i := 0; i < numEmitWorkers; i++ { + p.wg.GoCtx(func(ctx context.Context) error { + for req := range workerEmitCh { + result := newIOResult(req, emitWithRetries(ctx, req)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case workerResultCh <- result: + } + } + return nil + }) + } + + // submitIO, which sends requests to the workers, has to select on both + // workerEmitCh<- and <-workerResultCh, since without the <-workerResultCh + // there could be a deadlock where all workers are blocked on emitting results + // and thereby unable to accept new work. If a result is received, it also + // cannot immediately call handleResult on it as that could cause a re-entrant + // submitIO -> handleResult -> submitIO -> handleResult chain which is complex + // to manage. To avoid this, results are added to a pending list to be handled + // separately. + var pendingResults []*ioResult + submitIO := func(req IORequest) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case workerEmitCh <- req: + return nil + case res := <-workerResultCh: + pendingResults = append(pendingResults, res) + } + } + } + + // The main routine keeps track of incoming and completed requests, where + // admitted requests yet to be completed have their Keys() tracked in an + // intset, and any incoming request with keys already in the intset are placed + // in a Queue to be sent to IO workers once the conflicting requests complete. + var inflight intsets.Fast + var pending []queuedRequest + + handleResult := func(res *ioResult) error { + if res.err == nil { + // Clear out the completed keys to check for newly valid pending requests. + inflight.DifferenceWith(res.request.Keys()) + + // Check for a pending request that is now able to be sent i.e. is not + // conflicting with any inflight requests or any requests that arrived + // earlier than itself in the pending queue. + pendingKeys := intsets.Fast{} + for i, pendingReq := range pending { + if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) { + inflight.UnionWith(pendingReq.req.Keys()) + pending = append(pending[:i], pending[i+1:]...) + p.metrics.recordParallelIOQueueLatency(timeutil.Since(pendingReq.admitTime)) + if err := submitIO(pendingReq.req); err != nil { + return err + } + break + } + + pendingKeys.UnionWith(pendingReq.req.Keys()) + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case p.resultCh <- res: + return nil + } + } + + // If any yet-to-be-handled request observed so far shares any keys with an + // incoming request, the incoming request cannot be sent out and risk arriving + // earlier. + canSendKeys := func(keys intsets.Fast) bool { + if inflight.Intersects(keys) { + return true + } + for _, pendingReq := range pending { + if pendingReq.req.Keys().Intersects(keys) { + return true + } + } + return false + } + + for { + // Handle any results that arrived during any submitIO attempts + unhandled := pendingResults + pendingResults = nil + for _, res := range unhandled { + if err := handleResult(res); err != nil { + return err + } + } + + select { + case req := <-p.requestCh: + if canSendKeys(req.Keys()) { + // If a request conflicts with any currently unhandled requests, add it + // to the pending queue to be rechecked for validity later. + pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()}) + } else { + inflight.UnionWith(req.Keys()) + if err := submitIO(req); err != nil { + return err + } + } + case res := <-workerResultCh: + if err := handleResult(res); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + } + } +} diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 70eff852d348..12d41d2b0eb3 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -10,7 +10,11 @@ package changefeedccl import ( "context" + "encoding/json" + "math" "net/url" + "runtime" + "strconv" "strings" "time" @@ -19,13 +23,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -146,6 +156,16 @@ func getAndDialSink( return sink, sink.Dial() } +// NewWebhookSinkEnabled determines whether or not the refactored Webhook sink +// or the deprecated sink should be used. +var NewWebhookSinkEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.new_webhook_sink_enabled", + "if enabled, this setting enables a new implementation of the webhook sink"+ + " that allows for a much higher throughput", + util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false), +) + func getSink( ctx context.Context, serverCfg *execinfra.ServerConfig, @@ -207,10 +227,17 @@ func getSink( if err != nil { return nil, err } - return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { - return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, - defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder) - }) + if NewWebhookSinkEnabled.Get(&serverCfg.Settings.SV) { + return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { + return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, + numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder) + }) + } else { + return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { + return makeDeprecatedWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, + defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder) + }) + } case isPubsubSink(u): // TODO: add metrics to pubsubsink return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered)) @@ -606,12 +633,10 @@ func (n *nullSink) Dial() error { } // safeSink wraps an EventSink in a mutex so it's methods are -// thread safe. It also has a beforeFlush hook which is called -// at the beginning of safeSink.Flush(). +// thread safe. type safeSink struct { syncutil.Mutex - beforeFlush func(ctx context.Context) error - wrapped EventSink + wrapped EventSink } var _ EventSink = (*safeSink)(nil) @@ -645,9 +670,6 @@ func (s *safeSink) EmitRow( } func (s *safeSink) Flush(ctx context.Context) error { - if err := s.beforeFlush(ctx); err != nil { - return err - } s.Lock() defer s.Unlock() return s.wrapped.Flush(ctx) @@ -673,3 +695,161 @@ type SinkWithEncoder interface { Flush(ctx context.Context) error } + +// proper JSON schema for sink config: +// +// { +// "Flush": { +// "Messages": ..., +// "Bytes": ..., +// "Frequency": ..., +// }, +// "Retry": { +// "Max": ..., +// "Backoff": ..., +// } +// } +type sinkJSONConfig struct { + Flush sinkBatchConfig `json:",omitempty"` + Retry sinkRetryConfig `json:",omitempty"` +} + +type sinkBatchConfig struct { + Bytes, Messages int `json:",omitempty"` + Frequency jsonDuration `json:",omitempty"` +} + +// wrapper structs to unmarshal json, retry.Options will be the actual config +type sinkRetryConfig struct { + Max jsonMaxRetries `json:",omitempty"` + Backoff jsonDuration `json:",omitempty"` +} + +func defaultRetryConfig() retry.Options { + opts := retry.Options{ + InitialBackoff: 500 * time.Millisecond, + MaxRetries: 3, + Multiplier: 2, + } + // max backoff should be initial * 2 ^ maxRetries + opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries)))) + return opts +} + +func getSinkConfigFromJson( + jsonStr changefeedbase.SinkSpecificJSONConfig, +) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) { + retryCfg = defaultRetryConfig() + + var cfg = sinkJSONConfig{} + cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries) + cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff) + if jsonStr != `` { + // set retry defaults to be overridden if included in JSON + if err = json.Unmarshal([]byte(jsonStr), &cfg); err != nil { + return batchCfg, retryCfg, errors.Wrapf(err, "error unmarshalling json") + } + } + + // don't support negative values + if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 || + cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 { + return batchCfg, retryCfg, errors.Errorf("invalid sink config, all values must be non-negative") + } + + // errors if other batch values are set, but frequency is not + if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 { + return batchCfg, retryCfg, errors.Errorf("invalid sink config, Flush.Frequency is not set, messages may never be sent") + } + + retryCfg.MaxRetries = int(cfg.Retry.Max) + retryCfg.InitialBackoff = time.Duration(cfg.Retry.Backoff) + return cfg.Flush, retryCfg, nil +} + +type jsonMaxRetries int + +func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error { + var i int64 + // try to parse as int + i, err := strconv.ParseInt(string(b), 10, 64) + if err == nil { + if i <= 0 { + return errors.Errorf("Retry.Max must be a positive integer. use 'inf' for infinite retries.") + } + *j = jsonMaxRetries(i) + } else { + // if that fails, try to parse as string (only accept 'inf') + var s string + // using unmarshal here to remove quotes around the string + if err := json.Unmarshal(b, &s); err != nil { + return errors.Errorf("Retry.Max must be either a positive int or 'inf' for infinite retries.") + } + if strings.ToLower(s) == "inf" { + // if used wants infinite retries, set to zero as retry.Options interprets this as infinity + *j = 0 + } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings + *j = jsonMaxRetries(n) + } else { + return errors.Errorf("Retry.Max must be either a positive int or 'inf' for infinite retries.") + } + } + return nil +} + +func numSinkIOWorkers(cfg *execinfra.ServerConfig) int { + numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV) + if numWorkers > 0 { + return int(numWorkers) + } + + idealNumber := runtime.GOMAXPROCS(0) + if idealNumber < 1 { + return 1 + } + if idealNumber > 32 { + return 32 + } + return idealNumber +} + +func newCPUPacerFactory(ctx context.Context, cfg *execinfra.ServerConfig) func() *admission.Pacer { + var prevPacer *admission.Pacer + var prevRequestUnit time.Duration + + return func() *admission.Pacer { + pacerRequestUnit := changefeedbase.SinkPacerRequestSize.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV) + + if enablePacer && prevPacer != nil && prevRequestUnit == pacerRequestUnit { + return prevPacer + } + + var pacer *admission.Pacer = nil + if enablePacer { + tenantID, ok := roachpb.ClientTenantFromContext(ctx) + if !ok { + tenantID = roachpb.SystemTenantID + } + + pacer = cfg.AdmissionPacerFactory.NewPacer( + pacerRequestUnit, + admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.BulkNormalPri, + CreateTime: timeutil.Now().UnixNano(), + BypassAdmission: false, + }, + ) + } + + prevPacer = pacer + prevRequestUnit = pacerRequestUnit + + return pacer + } +} + +func nilPacerFactory() *admission.Pacer { + return nil +} diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index eb663f021efc..25ae999c29ea 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -764,3 +764,86 @@ func TestKafkaSinkTracksMemory(t *testing.T) { require.EqualValues(t, 0, p.outstanding()) require.EqualValues(t, 0, pool.used()) } + +func TestSinkConfigParsing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + t.Run("handles valid types", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":30}, "Retry": {"Max": 5, "Backoff": "3h"}}`) + batch, retry, err := getSinkConfigFromJson(opts) + require.NoError(t, err) + require.Equal(t, batch, sinkBatchConfig{ + Bytes: 30, + Messages: 1234, + Frequency: jsonDuration(3 * time.Second), + }) + require.Equal(t, retry.MaxRetries, 5) + require.Equal(t, retry.InitialBackoff, 3*time.Hour) + + // Max accepts both values and specifically the string "inf" + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`) + _, retry, err = getSinkConfigFromJson(opts) + require.NoError(t, err) + require.Equal(t, retry.MaxRetries, 0) + }) + + t.Run("provides retry defaults", func(t *testing.T) { + defaultRetry := defaultRetryConfig() + + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s"}}`) + _, retry, err := getSinkConfigFromJson(opts) + require.NoError(t, err) + + require.Equal(t, retry.MaxRetries, defaultRetry.MaxRetries) + require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff) + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`) + _, retry, err = getSinkConfigFromJson(opts) + require.NoError(t, err) + require.Equal(t, retry.MaxRetries, 0) + require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff) + }) + + t.Run("errors on invalid configuration", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": -1234, "Frequency": "3s"}}`) + _, _, err := getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid sink config, all values must be non-negative") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "-3s"}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid sink config, all values must be non-negative") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 10}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid sink config, Flush.Frequency is not set, messages may never be sent") + }) + + t.Run("errors on invalid type", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": "1234", "Frequency": "3s", "Bytes":30}}`) + _, _, err := getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Flush.Messages of type int") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":"30"}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Flush.Bytes of type int") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": true}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "not-inf"}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'") + }) + + t.Run("errors on malformed json", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234 "Frequency": "3s"}}`) + _, _, err := getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid character '\"'") + + opts = changefeedbase.SinkSpecificJSONConfig(`string`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid character 's' looking for beginning of value") + }) +} diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 09fd6cb508a2..f2ca73d125de 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -17,11 +17,9 @@ import ( "fmt" "hash/crc32" "io" - "math" "net" "net/http" "net/url" - "strconv" "strings" "time" @@ -36,23 +34,7 @@ import ( "github.com/cockroachdb/errors" ) -const ( - applicationTypeJSON = `application/json` - applicationTypeCSV = `text/csv` - authorizationHeader = `Authorization` -) - -func isWebhookSink(u *url.URL) bool { - switch u.Scheme { - // allow HTTP here but throw an error later to make it clear HTTPS is required - case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: - return true - default: - return false - } -} - -type webhookSink struct { +type deprecatedWebhookSink struct { // Webhook configuration. parallelism int retryCfg retry.Options @@ -80,11 +62,11 @@ type webhookSink struct { workerCtx context.Context workerGroup ctxgroup.Group exitWorkers func() // Signaled to shut down all workers. - eventsChans []chan []messagePayload + eventsChans []chan []deprecatedMessagePayload metrics metricsRecorder } -func (s *webhookSink) getConcreteType() sinkType { +func (s *deprecatedWebhookSink) getConcreteType() sinkType { return sinkTypeWebhook } @@ -100,7 +82,7 @@ type encodedPayload struct { mvcc hlc.Timestamp } -func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -129,7 +111,7 @@ func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) return result, err } -func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadCSVWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -150,7 +132,7 @@ func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) return result, nil } -type messagePayload struct { +type deprecatedMessagePayload struct { // Payload message fields. key []byte val []byte @@ -162,15 +144,15 @@ type messagePayload struct { // webhookMessage contains either messagePayload or a flush request. type webhookMessage struct { flushDone *chan struct{} - payload messagePayload + payload deprecatedMessagePayload } type batch struct { - buffer []messagePayload + buffer []deprecatedMessagePayload bufferBytes int } -func (b *batch) addToBuffer(m messagePayload) { +func (b *batch) addToBuffer(m deprecatedMessagePayload) { b.bufferBytes += len(m.val) b.buffer = append(b.buffer, m) } @@ -185,36 +167,6 @@ type batchConfig struct { Frequency jsonDuration `json:",omitempty"` } -type jsonMaxRetries int - -func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error { - var i int64 - // try to parse as int - i, err := strconv.ParseInt(string(b), 10, 64) - if err == nil { - if i <= 0 { - return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.") - } - *j = jsonMaxRetries(i) - } else { - // if that fails, try to parse as string (only accept 'inf') - var s string - // using unmarshal here to remove quotes around the string - if err := json.Unmarshal(b, &s); err != nil { - return err - } - if strings.ToLower(s) == "inf" { - // if used wants infinite retries, set to zero as retry.Options interprets this as infinity - *j = 0 - } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings - *j = jsonMaxRetries(n) - } else { - return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.") - } - } - return nil -} - // wrapper structs to unmarshal json, retry.Options will be the actual config type retryConfig struct { Max jsonMaxRetries `json:",omitempty"` @@ -239,7 +191,7 @@ type webhookSinkConfig struct { Retry retryConfig `json:",omitempty"` } -func (s *webhookSink) getWebhookSinkConfig( +func (s *deprecatedWebhookSink) getWebhookSinkConfig( jsonStr changefeedbase.SinkSpecificJSONConfig, ) (batchCfg batchConfig, retryCfg retry.Options, err error) { retryCfg = defaultRetryConfig() @@ -257,12 +209,12 @@ func (s *webhookSink) getWebhookSinkConfig( // don't support negative values if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 || cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 { - return batchCfg, retryCfg, errors.Errorf("invalid option value %s, all config values must be non-negative", changefeedbase.OptWebhookSinkConfig) + return batchCfg, retryCfg, errors.Errorf("invalid sink config, all values must be non-negative") } // errors if other batch values are set, but frequency is not if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 { - return batchCfg, retryCfg, errors.Errorf("invalid option value %s, flush frequency is not set, messages may never be sent", changefeedbase.OptWebhookSinkConfig) + return batchCfg, retryCfg, errors.Errorf("invalid sink config, Flush.Frequency is not set, messages may never be sent") } retryCfg.MaxRetries = int(cfg.Retry.Max) @@ -270,7 +222,7 @@ func (s *webhookSink) getWebhookSinkConfig( return cfg.Flush, retryCfg, nil } -func makeWebhookSink( +func makeDeprecatedWebhookSink( ctx context.Context, u sinkURL, encodingOpts changefeedbase.EncodingOptions, @@ -312,7 +264,7 @@ func makeWebhookSink( ctx, cancel := context.WithCancel(ctx) - sink := &webhookSink{ + sink := &deprecatedWebhookSink{ workerCtx: ctx, authHeader: opts.AuthHeader, exitWorkers: cancel, @@ -329,7 +281,7 @@ func makeWebhookSink( } // TODO(yevgeniy): Establish HTTP connection in Dial(). - sink.client, err = makeWebhookClient(u, connTimeout) + sink.client, err = deprecatedMakeWebhookClient(u, connTimeout) if err != nil { return nil, err } @@ -350,7 +302,7 @@ func makeWebhookSink( return sink, nil } -func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { +func deprecatedMakeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { client := &httputil.Client{ Client: &http.Client{ Timeout: timeout, @@ -417,30 +369,19 @@ func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, erro return client, nil } -func defaultRetryConfig() retry.Options { - opts := retry.Options{ - InitialBackoff: 500 * time.Millisecond, - MaxRetries: 3, - Multiplier: 2, - } - // max backoff should be initial * 2 ^ maxRetries - opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries)))) - return opts -} - // defaultWorkerCount() is the number of CPU's on the machine func defaultWorkerCount() int { return system.NumCPU() } -func (s *webhookSink) Dial() error { +func (s *deprecatedWebhookSink) Dial() error { s.setupWorkers() return nil } -func (s *webhookSink) setupWorkers() { +func (s *deprecatedWebhookSink) setupWorkers() { // setup events channels to send to workers and the worker group - s.eventsChans = make([]chan []messagePayload, s.parallelism) + s.eventsChans = make([]chan []deprecatedMessagePayload, s.parallelism) s.workerGroup = ctxgroup.WithContext(s.workerCtx) s.batchChan = make(chan webhookMessage) @@ -455,7 +396,7 @@ func (s *webhookSink) setupWorkers() { return nil }) for i := 0; i < s.parallelism; i++ { - s.eventsChans[i] = make(chan []messagePayload) + s.eventsChans[i] = make(chan []deprecatedMessagePayload) j := i s.workerGroup.GoCtx(func(ctx context.Context) error { s.workerLoop(j) @@ -464,7 +405,7 @@ func (s *webhookSink) setupWorkers() { } } -func (s *webhookSink) shouldSendBatch(b batch) bool { +func (s *deprecatedWebhookSink) shouldSendBatch(b batch) bool { // similar to sarama, send batch if: // everything is zero (default) // any one of the conditions are met UNLESS the condition is zero which means never batch @@ -483,8 +424,8 @@ func (s *webhookSink) shouldSendBatch(b batch) bool { } } -func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error { - workerBatches := make([][]messagePayload, s.parallelism) +func (s *deprecatedWebhookSink) splitAndSendBatch(batch []deprecatedMessagePayload) error { + workerBatches := make([][]deprecatedMessagePayload, s.parallelism) for _, msg := range batch { // split batch into per-worker batches i := s.workerIndex(msg.key) @@ -504,7 +445,7 @@ func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error { } // flushWorkers sends flush request to each worker and waits for each one to acknowledge. -func (s *webhookSink) flushWorkers(done chan struct{}) error { +func (s *deprecatedWebhookSink) flushWorkers(done chan struct{}) error { for i := 0; i < len(s.eventsChans); i++ { // Ability to write a nil message to events channel indicates that // the worker has processed all other messages. @@ -525,7 +466,7 @@ func (s *webhookSink) flushWorkers(done chan struct{}) error { // batchWorker ingests messages from EmitRow into a batch and splits them into // per-worker batches to be sent separately -func (s *webhookSink) batchWorker() { +func (s *deprecatedWebhookSink) batchWorker() { var batchTracker batch batchTimer := s.ts.NewTimer() defer batchTimer.Stop() @@ -578,7 +519,7 @@ func (s *webhookSink) batchWorker() { } } -func (s *webhookSink) workerLoop(workerIndex int) { +func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { for { select { case <-s.workerCtx.Done(): @@ -613,14 +554,14 @@ func (s *webhookSink) workerLoop(workerIndex int) { } } -func (s *webhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { requestFunc := func() error { return s.sendMessage(ctx, reqBody) } return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc) } -func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error { req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody)) if err != nil { return err @@ -658,13 +599,13 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { // deterministically assigned to the same worker. Since we have a channel per // worker, we can ensure per-worker ordering and therefore guarantee per-key // ordering. -func (s *webhookSink) workerIndex(key []byte) uint32 { +func (s *deprecatedWebhookSink) workerIndex(key []byte) uint32 { return crc32.ChecksumIEEE(key) % uint32(s.parallelism) } // exitWorkersWithError saves the first error message encountered by webhook workers, // and requests all workers to terminate. -func (s *webhookSink) exitWorkersWithError(err error) { +func (s *deprecatedWebhookSink) exitWorkersWithError(err error) { // errChan has buffer size 1, first error will be saved to the buffer and // subsequent errors will be ignored select { @@ -675,7 +616,7 @@ func (s *webhookSink) exitWorkersWithError(err error) { } // sinkError checks to see if any errors occurred inside workers go routines. -func (s *webhookSink) sinkError() error { +func (s *deprecatedWebhookSink) sinkError() error { select { case err := <-s.errChan: return err @@ -684,7 +625,7 @@ func (s *webhookSink) sinkError() error { } } -func (s *webhookSink) EmitRow( +func (s *deprecatedWebhookSink) EmitRow( ctx context.Context, topic TopicDescriptor, key, value []byte, @@ -702,7 +643,7 @@ func (s *webhookSink) EmitRow( case err := <-s.errChan: return err case s.batchChan <- webhookMessage{ - payload: messagePayload{ + payload: deprecatedMessagePayload{ key: key, val: value, alloc: alloc, @@ -714,7 +655,7 @@ func (s *webhookSink) EmitRow( return nil } -func (s *webhookSink) EmitResolvedTimestamp( +func (s *deprecatedWebhookSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { defer s.metrics.recordResolvedCallback()() @@ -745,7 +686,7 @@ func (s *webhookSink) EmitResolvedTimestamp( return nil } -func (s *webhookSink) Flush(ctx context.Context) error { +func (s *deprecatedWebhookSink) Flush(ctx context.Context) error { s.metrics.recordFlushRequestCallback()() // Send flush request. @@ -768,7 +709,7 @@ func (s *webhookSink) Flush(ctx context.Context) error { } } -func (s *webhookSink) Close() error { +func (s *deprecatedWebhookSink) Close() error { s.exitWorkers() // ignore errors here since we're closing the sink anyway _ = s.workerGroup.Wait() diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 199734be6bf5..65356260c7e6 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -15,6 +15,7 @@ import ( "net/http" "net/url" "strings" + "sync/atomic" "testing" "time" @@ -83,7 +84,7 @@ func setupWebhookSinkWithDetails( if err != nil { return nil, err } - sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, source, nilMetricsRecorderBuilder) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder) if err != nil { return nil, err } @@ -172,8 +173,6 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) - require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) params.Set(changefeedbase.SinkParamSkipTLSVerify, "true") sinkDestHost.RawQuery = params.Encode() @@ -191,8 +190,6 @@ func TestWebhookSink(t *testing.T) { err = sinkSrc.Flush(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf(`Post "%s":`, sinkDest.URL())) - require.EqualError(t, sinkSrc.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) sinkDestHTTP, err := cdctest.StartMockWebhookSinkInsecure() require.NoError(t, err) @@ -207,8 +204,6 @@ func TestWebhookSink(t *testing.T) { require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()), fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(), "http://")))) - require.EqualError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert) require.NoError(t, err) @@ -230,6 +225,7 @@ func TestWebhookSink(t *testing.T) { Opts: opts, } + require.NoError(t, sinkSrc.Close()) sinkSrc, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) @@ -305,8 +301,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") - require.EqualError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) // wrong credentials should result in a 401 as well var wrongAuthHeader string @@ -318,8 +312,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") - require.EqualError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) require.NoError(t, sinkSrc.Close()) require.NoError(t, sinkSrcNoCreds.Close()) @@ -603,6 +595,13 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt) require.NoError(t, err) + batchingSink, ok := sinkSrc.(*batchingSink) + require.True(t, ok) + var appendCount int32 = 0 + batchingSink.knobs.OnAppend = func(event *rowEvent) { + atomic.AddInt32(&appendCount, 1) + } + // send incomplete batch require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) @@ -611,11 +610,10 @@ func TestWebhookSinkConfig(t *testing.T) { require.Equal(t, sinkDest.Latest(), "") testutils.SucceedsSoon(t, func() error { - // wait for the timer in batch worker to be set (1 hour from now, as specified by config) before advancing time. - if len(mt.Timers()) == 1 && mt.Timers()[0] == mt.Now().Add(time.Hour) { + if atomic.LoadInt32(&appendCount) >= 2 { return nil } - return errors.New("Waiting for timer to be created by batch worker") + return errors.New("Waiting for rows to be buffered") }) mt.Advance(time.Hour) require.NoError(t, sinkSrc.Flush(context.Background())) diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go new file mode 100644 index 000000000000..e6744b0815be --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -0,0 +1,380 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +const ( + applicationTypeJSON = `application/json` + applicationTypeCSV = `text/csv` + authorizationHeader = `Authorization` +) + +func isWebhookSink(u *url.URL) bool { + switch u.Scheme { + // allow HTTP here but throw an error later to make it clear HTTPS is required + case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: + return true + default: + return false + } +} + +type webhookSinkClient struct { + ctx context.Context + format changefeedbase.FormatType + url sinkURL + authHeader string + batchCfg sinkBatchConfig + client *httputil.Client +} + +var _ SinkClient = (*webhookSinkClient)(nil) + +func makeWebhookSinkClient( + ctx context.Context, + u sinkURL, + encodingOpts changefeedbase.EncodingOptions, + opts changefeedbase.WebhookSinkOptions, + batchCfg sinkBatchConfig, + parallelism int, +) (SinkClient, error) { + err := validateWebhookOpts(u, encodingOpts, opts) + if err != nil { + return nil, err + } + + u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`) + + sinkClient := &webhookSinkClient{ + ctx: ctx, + authHeader: opts.AuthHeader, + format: encodingOpts.Format, + batchCfg: batchCfg, + } + + var connTimeout time.Duration + if opts.ClientTimeout != nil { + connTimeout = *opts.ClientTimeout + } + sinkClient.client, err = makeWebhookClient(u, connTimeout, parallelism) + if err != nil { + return nil, err + } + + // remove known query params from sink URL before setting in sink config + sinkURLParsed, err := url.Parse(u.String()) + if err != nil { + return nil, err + } + params := sinkURLParsed.Query() + params.Del(changefeedbase.SinkParamSkipTLSVerify) + params.Del(changefeedbase.SinkParamCACert) + params.Del(changefeedbase.SinkParamClientCert) + params.Del(changefeedbase.SinkParamClientKey) + sinkURLParsed.RawQuery = params.Encode() + sinkClient.url = sinkURL{URL: sinkURLParsed} + + return sinkClient, nil +} + +func makeWebhookClient( + u sinkURL, timeout time.Duration, parallelism int, +) (*httputil.Client, error) { + client := &httputil.Client{ + Client: &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DialContext: (&net.Dialer{Timeout: timeout}).DialContext, + MaxConnsPerHost: parallelism, + MaxIdleConnsPerHost: parallelism, + IdleConnTimeout: time.Minute, + ForceAttemptHTTP2: true, + }, + }, + } + + dialConfig := struct { + tlsSkipVerify bool + caCert []byte + clientCert []byte + clientKey []byte + }{} + + transport := client.Transport.(*http.Transport) + + if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil { + return nil, err + } + + transport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: dialConfig.tlsSkipVerify, + } + + if dialConfig.caCert != nil { + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "could not load system root CA pool") + } + if caCertPool == nil { + caCertPool = x509.NewCertPool() + } + if !caCertPool.AppendCertsFromPEM(dialConfig.caCert) { + return nil, errors.Errorf("failed to parse certificate data:%s", string(dialConfig.caCert)) + } + transport.TLSClientConfig.RootCAs = caCertPool + } + + if dialConfig.clientCert != nil && dialConfig.clientKey == nil { + return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientCert, changefeedbase.SinkParamClientKey) + } else if dialConfig.clientKey != nil && dialConfig.clientCert == nil { + return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientKey, changefeedbase.SinkParamClientCert) + } + + if dialConfig.clientCert != nil && dialConfig.clientKey != nil { + cert, err := tls.X509KeyPair(dialConfig.clientCert, dialConfig.clientKey) + if err != nil { + return nil, errors.Wrap(err, `invalid client certificate data provided`) + } + transport.TLSClientConfig.Certificates = []tls.Certificate{cert} + } + + return client, nil +} + +func (wse *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) { + req, err := http.NewRequestWithContext(wse.ctx, http.MethodPost, wse.url.String(), bytes.NewReader(body)) + if err != nil { + return nil, err + } + switch wse.format { + case changefeedbase.OptFormatJSON: + req.Header.Set("Content-Type", applicationTypeJSON) + case changefeedbase.OptFormatCSV: + req.Header.Set("Content-Type", applicationTypeCSV) + } + + if wse.authHeader != "" { + req.Header.Set(authorizationHeader, wse.authHeader) + } + + return req, nil +} + +// MakeResolvedPayload implements the SinkClient interface +func (wse *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { + return wse.makePayloadForBytes(body) +} + +// Flush implements the SinkClient interface +func (wse *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error { + req := batch.(*http.Request) + res, err := wse.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) { + resBody, err := io.ReadAll(res.Body) + if err != nil { + return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode) + } + return fmt.Errorf("%s: %s", res.Status, string(resBody)) + } + return nil +} + +// Close implements the SinkClient interface +func (wse *webhookSinkClient) Close() error { + wse.client.CloseIdleConnections() + return nil +} + +func validateWebhookOpts( + u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions, +) error { + if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS { + return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS) + } + + switch encodingOpts.Format { + case changefeedbase.OptFormatJSON: + case changefeedbase.OptFormatCSV: + default: + return errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptFormat, encodingOpts.Format) + } + + switch encodingOpts.Envelope { + case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare: + default: + return errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptEnvelope, encodingOpts.Envelope) + } + + encodingOpts.TopicInValue = true + + if encodingOpts.Envelope != changefeedbase.OptEnvelopeBare { + encodingOpts.KeyInValue = true + } + + return nil +} + +type webhookCSVBuffer struct { + bytes []byte + messageCount int + sc *webhookSinkClient +} + +var _ BatchBuffer = (*webhookCSVBuffer)(nil) + +// Append implements the BatchBuffer interface +func (cw *webhookCSVBuffer) Append(key []byte, value []byte, topic string) { + cw.bytes = append(cw.bytes, value...) + cw.messageCount += 1 +} + +// ShouldFlush implements the BatchBuffer interface +func (cw *webhookCSVBuffer) ShouldFlush() bool { + return cw.sc.shouldFlush(len(cw.bytes), cw.messageCount) +} + +// Close implements the BatchBuffer interface +func (cw *webhookCSVBuffer) Close() (SinkPayload, error) { + return cw.sc.makePayloadForBytes(cw.bytes) +} + +type webhookJSONBuffer struct { + messages [][]byte + numBytes int + sc *webhookSinkClient +} + +var _ BatchBuffer = (*webhookJSONBuffer)(nil) + +// Append implements the BatchBuffer interface +func (jw *webhookJSONBuffer) Append(key []byte, value []byte, topic string) { + jw.messages = append(jw.messages, value) + jw.numBytes += len(value) +} + +// ShouldFlush implements the BatchBuffer interface +func (jw *webhookJSONBuffer) ShouldFlush() bool { + return jw.sc.shouldFlush(jw.numBytes, len(jw.messages)) +} + +// Close implements the BatchBuffer interface +func (jw *webhookJSONBuffer) Close() (SinkPayload, error) { + var buffer bytes.Buffer + prefix := "{\"payload\":[" + suffix := fmt.Sprintf("],\"length\":%d}", len(jw.messages)) + + // Grow all at once to avoid reallocations + buffer.Grow(len(prefix) + jw.numBytes /* msgs */ + len(jw.messages) /* commas */ + len(suffix)) + + buffer.WriteString(prefix) + for i, msg := range jw.messages { + if i != 0 { + buffer.WriteByte(',') + } + buffer.Write(msg) + } + buffer.WriteString(suffix) + return jw.sc.makePayloadForBytes(buffer.Bytes()) +} + +func (wse *webhookSinkClient) MakeBatchBuffer() BatchBuffer { + if wse.format == changefeedbase.OptFormatCSV { + return &webhookCSVBuffer{sc: wse} + } else { + return &webhookJSONBuffer{ + sc: wse, + messages: make([][]byte, 0, wse.batchCfg.Messages), + } + } +} + +func (wse *webhookSinkClient) shouldFlush(bytes int, messages int) bool { + switch { + // all zero values is interpreted as flush every time + case wse.batchCfg.Messages == 0 && wse.batchCfg.Bytes == 0 && wse.batchCfg.Frequency == 0: + return true + // messages threshold has been reached + case wse.batchCfg.Messages > 0 && messages >= wse.batchCfg.Messages: + return true + // bytes threshold has been reached + case wse.batchCfg.Bytes > 0 && bytes >= wse.batchCfg.Bytes: + return true + default: + return false + } +} + +func makeWebhookSink( + ctx context.Context, + u sinkURL, + encodingOpts changefeedbase.EncodingOptions, + opts changefeedbase.WebhookSinkOptions, + parallelism int, + pacerFactory func() *admission.Pacer, + source timeutil.TimeSource, + mb metricsRecorderBuilder, +) (Sink, error) { + batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig) + if err != nil { + return nil, err + } + + sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism) + if err != nil { + return nil, err + } + + return makeBatchingSink( + ctx, + sinkTypeWebhook, + sinkClient, + time.Duration(batchCfg.Frequency), + retryOpts, + parallelism, + nil, + pacerFactory, + source, + mb(requiresResourceAccounting), + ), nil +} diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 1788b8789069..906b634a29d9 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -136,6 +136,8 @@ type telemetryMetricsRecorder struct { inner metricsRecorder } +var _ metricsRecorder = (*telemetryMetricsRecorder)(nil) + func (r *telemetryMetricsRecorder) close() { r.telemetryLogger.close() } @@ -184,6 +186,14 @@ func (r *telemetryMetricsRecorder) recordSizeBasedFlush() { r.inner.recordSizeBasedFlush() } +func (r *telemetryMetricsRecorder) recordParallelIOQueueLatency(latency time.Duration) { + r.inner.recordParallelIOQueueLatency(latency) +} + +func (r *telemetryMetricsRecorder) recordSinkIOInflightChange(delta int64) { + r.inner.recordSinkIOInflightChange(delta) +} + // ContinuousTelemetryInterval determines the interval at which each node emits telemetry events // during the lifespan of each enterprise changefeed. var ContinuousTelemetryInterval = settings.RegisterDurationSetting( diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index dd1dc9da998c..83c25cf37c0c 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1235,6 +1235,7 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) + // The deprecated webhook sink is unable to handle the throughput required for 100 warehouses if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil { ct.t.Fatal(err) }