diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 4a2627ac20ea..ae43a041aab6 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -17,6 +17,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr
changefeed.fast_gzip.enabled boolean true use fast gzip implementation
changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables
+changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.
cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout duration 10m0s the timeout for import/export storage operations
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 82f6ecf1805c..6a94ca4a42f1 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -23,6 +23,7 @@
changefeed.fast_gzip.enabled
| boolean | true | use fast gzip implementation |
changefeed.node_throttle_config
| string |
| specifies node level throttling configuration for all changefeeeds |
changefeed.schema_feed.read_with_priority_after
| duration | 1m0s | retry with high priority if we were not able to read descriptors for too long; 0 disables |
+changefeed.sink_io_workers
| integer | 0 | the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. |
cloudstorage.azure.concurrent_upload_buffers
| integer | 1 | controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload |
cloudstorage.http.custom_ca
| string |
| custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage |
cloudstorage.timeout
| duration | 10m0s | the timeout for import/export storage operations |
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index e98d411c77ee..5398084d821e 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -7,6 +7,7 @@ go_library(
"alter_changefeed_stmt.go",
"authorization.go",
"avro.go",
+ "batching_sink.go",
"changefeed.go",
"changefeed_dist.go",
"changefeed_processors.go",
@@ -20,6 +21,7 @@ go_library(
"event_processing.go",
"metrics.go",
"name.go",
+ "parallel_io.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"scheduled_changefeed.go",
@@ -32,6 +34,7 @@ go_library(
"sink_pubsub.go",
"sink_sql.go",
"sink_webhook.go",
+ "sink_webhook_v2.go",
"telemetry.go",
"testing_knobs.go",
"tls.go",
@@ -126,6 +129,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
+ "//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/eventpb",
diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go
new file mode 100644
index 000000000000..d764d7abcab6
--- /dev/null
+++ b/pkg/ccl/changefeedccl/batching_sink.go
@@ -0,0 +1,482 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Licensed as a CockroachDB Enterprise file under the Cockroach Community
+// License (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
+
+package changefeedccl
+
+import (
+ "context"
+ "fmt"
+ "hash"
+ "sync"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/intsets"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/retry"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+)
+
+// SinkClient is an interface to an external sink, where messages are written
+// into batches as they arrive and once ready are flushed out.
+type SinkClient interface {
+ MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
+ MakeBatchBuffer() BatchBuffer
+ Flush(context.Context, SinkPayload) error
+ Close() error
+}
+
+// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
+// to the sink.
+type BatchBuffer interface {
+ Append(key []byte, value []byte, topic string)
+ ShouldFlush() bool
+
+ // Once all data has been Append'ed, Close can be called to return a finalized
+ // Payload that is valid for a pure IO operation via Flush. All final encoding
+ // work (ex: wrapping an array in an object with metadata) should be done in
+ // Close, as non-io-related work done in Flush would be unnecessarily repeated
+ // upon retries.
+ Close() (SinkPayload, error)
+}
+
+// SinkPayload is an interface representing a sink-specific representation of a
+// batch of messages that is ready to be emitted by its Flush method.
+type SinkPayload interface{}
+
+// batchingSink wraps a SinkClient to provide a Sink implementation that calls
+// the SinkClient methods to form batches and flushes those batches across
+// multiple parallel IO workers.
+type batchingSink struct {
+ client SinkClient
+ topicNamer *TopicNamer
+ concreteType sinkType
+
+ ioWorkers int
+ minFlushFrequency time.Duration
+ retryOpts retry.Options
+
+ ts timeutil.TimeSource
+ metrics metricsRecorder
+ knobs batchingSinkKnobs
+
+ // eventCh is the channel used to send requests from the Sink caller routines
+ // to the batching routine. Messages can either be a flushReq or a rowEvent.
+ eventCh chan interface{}
+
+ pacer *admission.Pacer
+ pacerFactory func() *admission.Pacer
+
+ termErr error
+ wg ctxgroup.Group
+ hasher hash.Hash32
+ doneCh chan struct{}
+}
+
+type batchingSinkKnobs struct {
+ OnAppend func(*rowEvent)
+}
+
+type flushReq struct {
+ waiter chan struct{}
+}
+
+type rowEvent struct {
+ key []byte
+ val []byte
+ topic string
+
+ alloc kvevent.Alloc
+ mvcc hlc.Timestamp
+}
+
+// Flush implements the Sink interface, returning the first error that has
+// occured in the past EmitRow calls.
+func (s *batchingSink) Flush(ctx context.Context) error {
+ defer s.metrics.recordFlushRequestCallback()()
+ flushWaiter := make(chan struct{})
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-s.doneCh:
+ case s.eventCh <- flushReq{waiter: flushWaiter}:
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-s.doneCh:
+ return nil
+ case <-flushWaiter:
+ if s.termErr != nil {
+ return s.termErr
+ }
+ }
+
+ // Refresh the pacer in case any settings have changed. s.pacer can safely be
+ // assigned since once the Flush has completed waiting, no new messages exist
+ // to be processed so pacer.Pace won't be called by the batching worker.
+ s.pacer = s.pacerFactory()
+
+ return nil
+}
+
+var _ Sink = (*batchingSink)(nil)
+
+// Event structs and batch structs which are transferred across routines (and
+// therefore escape to the heap) can both be incredibly frequent (every event
+// may be its own batch) and temporary, so to avoid GC thrashing they are both
+// claimed and freed from object pools.
+var eventPool sync.Pool = sync.Pool{
+ New: func() interface{} {
+ return new(rowEvent)
+ },
+}
+
+func newRowEvent() *rowEvent {
+ return eventPool.Get().(*rowEvent)
+}
+func freeRowEvent(e *rowEvent) {
+ *e = rowEvent{}
+ eventPool.Put(e)
+}
+
+var batchPool sync.Pool = sync.Pool{
+ New: func() interface{} {
+ return new(sinkBatch)
+ },
+}
+
+func newSinkBatch() *sinkBatch {
+ return batchPool.Get().(*sinkBatch)
+}
+func freeSinkBatchEvent(b *sinkBatch) {
+ *b = sinkBatch{}
+ batchPool.Put(b)
+}
+
+// EmitRow implements the Sink interface.
+func (s *batchingSink) EmitRow(
+ ctx context.Context,
+ topic TopicDescriptor,
+ key, value []byte,
+ updated, mvcc hlc.Timestamp,
+ alloc kvevent.Alloc,
+) error {
+ s.metrics.recordMessageSize(int64(len(key) + len(value)))
+
+ payload := newRowEvent()
+ payload.key = key
+ payload.val = value
+ payload.topic = "" // unimplemented for now
+ payload.mvcc = mvcc
+ payload.alloc = alloc
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case s.eventCh <- payload:
+ case <-s.doneCh:
+ }
+
+ return nil
+}
+
+// EmitResolvedTimestamp implements the Sink interface.
+func (s *batchingSink) EmitResolvedTimestamp(
+ ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
+) error {
+ data, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved)
+ if err != nil {
+ return err
+ }
+ payload, err := s.client.MakeResolvedPayload(data, "")
+ if err != nil {
+ return err
+ }
+
+ if err = s.Flush(ctx); err != nil {
+ return err
+ }
+ return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error {
+ return s.client.Flush(ctx, payload)
+ })
+}
+
+// Close implements the Sink interface.
+func (s *batchingSink) Close() error {
+ close(s.doneCh)
+ _ = s.wg.Wait()
+ s.pacer.Close()
+ return s.client.Close()
+}
+
+// Dial implements the Sink interface.
+func (s *batchingSink) Dial() error {
+ return nil
+}
+
+// getConcreteType implements the Sink interface.
+func (s *batchingSink) getConcreteType() sinkType {
+ return s.concreteType
+}
+
+// sinkBatch stores an in-progress/complete batch of messages, along with
+// metadata related to the batch.
+type sinkBatch struct {
+ buffer BatchBuffer
+ payload SinkPayload // payload is nil until FinalizePayload has been called
+
+ numMessages int
+ numKVBytes int // the total amount of uncompressed kv data in the batch
+ keys intsets.Fast // the set of keys within the batch to provide to parallelIO
+ bufferTime time.Time // the earliest time a message was inserted into the batch
+ mvcc hlc.Timestamp
+
+ alloc kvevent.Alloc
+ hasher hash.Hash32
+}
+
+// FinalizePayload closes the writer to produce a payload that is ready to be
+// Flushed by the SinkClient.
+func (sb *sinkBatch) FinalizePayload() error {
+ payload, err := sb.buffer.Close()
+ if err != nil {
+ return err
+ }
+ sb.payload = payload
+ return nil
+}
+
+// Keys implements the IORequest interface.
+func (sb *sinkBatch) Keys() intsets.Fast {
+ return sb.keys
+}
+
+func (sb *sinkBatch) isEmpty() bool {
+ return sb.numMessages == 0
+}
+
+func hashToInt(h hash.Hash32, buf []byte) int {
+ h.Reset()
+ h.Write(buf)
+ return int(h.Sum32())
+}
+
+// Append adds the contents of a kvEvent to the batch, merging its alloc pool.
+func (sb *sinkBatch) Append(e *rowEvent) {
+ if sb.isEmpty() {
+ sb.bufferTime = timeutil.Now()
+ }
+
+ sb.buffer.Append(e.key, e.val, e.topic)
+
+ sb.keys.Add(hashToInt(sb.hasher, e.key))
+ sb.numMessages += 1
+ sb.numKVBytes += len(e.key) + len(e.val)
+
+ if sb.mvcc.IsEmpty() || e.mvcc.Less(sb.mvcc) {
+ sb.mvcc = e.mvcc
+ }
+
+ sb.alloc.Merge(&e.alloc)
+}
+
+func (s *batchingSink) handleError(err error) {
+ if s.termErr == nil {
+ s.termErr = err
+ }
+}
+
+func (s *batchingSink) newBatchBuffer() *sinkBatch {
+ batch := newSinkBatch()
+ batch.buffer = s.client.MakeBatchBuffer()
+ batch.hasher = s.hasher
+ return batch
+}
+
+// runBatchingWorker combines 1 or more row events into batches, sending the IO
+// requests out either once the batch is full or a flush request arrives.
+func (s *batchingSink) runBatchingWorker(ctx context.Context) {
+ batchBuffer := s.newBatchBuffer()
+
+ // Once finalized, batches are sent to a parallelIO struct which handles
+ // performing multiple Flushes in parallel while maintaining Keys() ordering.
+ ioHandler := func(ctx context.Context, req IORequest) error {
+ batch, _ := req.(*sinkBatch)
+ defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages))
+ s.metrics.recordSinkIOInflightChange(int64(batch.numMessages))
+ return s.client.Flush(ctx, batch.payload)
+ }
+ ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics)
+ defer ioEmitter.Close()
+
+ // Flushing requires tracking the number of inflight messages and confirming
+ // completion to the requester once the counter reaches 0.
+ inflight := 0
+ var sinkFlushWaiter chan struct{}
+
+ handleResult := func(result *ioResult) {
+ batch, _ := result.request.(*sinkBatch)
+
+ if result.err != nil {
+ s.handleError(result.err)
+ } else {
+ s.metrics.recordEmittedBatch(
+ batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress,
+ )
+ }
+
+ inflight -= batch.numMessages
+
+ if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil {
+ close(sinkFlushWaiter)
+ sinkFlushWaiter = nil
+ }
+
+ freeIOResult(result)
+ batch.alloc.Release(ctx)
+ freeSinkBatchEvent(batch)
+ }
+
+ tryFlushBatch := func() error {
+ if batchBuffer.isEmpty() {
+ return nil
+ }
+ toFlush := batchBuffer
+ batchBuffer = s.newBatchBuffer()
+
+ if err := toFlush.FinalizePayload(); err != nil {
+ return err
+ }
+
+ // Emitting needs to also handle any incoming results to avoid a deadlock
+ // with trying to emit while the emitter is blocked on returning a result.
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case ioEmitter.requestCh <- toFlush:
+ case result := <-ioEmitter.resultCh:
+ handleResult(result)
+ continue
+ case <-s.doneCh:
+ }
+ break
+ }
+
+ return nil
+ }
+
+ flushTimer := s.ts.NewTimer()
+ defer flushTimer.Stop()
+
+ for {
+ select {
+ case req := <-s.eventCh:
+ if err := s.pacer.Pace(ctx); err != nil {
+ if pacerLogEvery.ShouldLog() {
+ log.Errorf(ctx, "automatic sink batcher pacing: %v", err)
+ }
+ }
+
+ switch r := req.(type) {
+ case *rowEvent:
+ if s.termErr != nil {
+ continue
+ }
+
+ inflight += 1
+
+ // If we're about to append to an empty batch, start the timer to
+ // guarantee the messages do not stay buffered longer than the
+ // configured frequency.
+ if batchBuffer.isEmpty() && s.minFlushFrequency > 0 {
+ flushTimer.Reset(s.minFlushFrequency)
+ }
+
+ batchBuffer.Append(r)
+ if s.knobs.OnAppend != nil {
+ s.knobs.OnAppend(r)
+ }
+
+ // The event struct can be freed as the contents are expected to be
+ // managed by the batch instead.
+ freeRowEvent(r)
+
+ if batchBuffer.buffer.ShouldFlush() {
+ s.metrics.recordSizeBasedFlush()
+ if err := tryFlushBatch(); err != nil {
+ s.handleError(err)
+ }
+ }
+ case flushReq:
+ if inflight == 0 || s.termErr != nil {
+ close(r.waiter)
+ } else {
+ sinkFlushWaiter = r.waiter
+ if err := tryFlushBatch(); err != nil {
+ s.handleError(err)
+ }
+ }
+ default:
+ s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r))
+ }
+ case result := <-ioEmitter.resultCh:
+ handleResult(result)
+ case <-flushTimer.Ch():
+ flushTimer.MarkRead()
+ if err := tryFlushBatch(); err != nil {
+ s.handleError(err)
+ }
+ case <-ctx.Done():
+ return
+ case <-s.doneCh:
+ return
+ }
+ }
+}
+
+func makeBatchingSink(
+ ctx context.Context,
+ concreteType sinkType,
+ client SinkClient,
+ minFlushFrequency time.Duration,
+ retryOpts retry.Options,
+ numWorkers int,
+ topicNamer *TopicNamer,
+ pacerFactory func() *admission.Pacer,
+ timeSource timeutil.TimeSource,
+ metrics metricsRecorder,
+) Sink {
+ sink := &batchingSink{
+ client: client,
+ topicNamer: topicNamer,
+ concreteType: concreteType,
+ minFlushFrequency: minFlushFrequency,
+ ioWorkers: numWorkers,
+ retryOpts: retryOpts,
+ ts: timeSource,
+ metrics: metrics,
+ eventCh: make(chan interface{}, flushQueueDepth),
+ wg: ctxgroup.WithContext(ctx),
+ hasher: makeHasher(),
+ pacerFactory: pacerFactory,
+ pacer: pacerFactory(),
+ doneCh: make(chan struct{}),
+ }
+
+ sink.wg.GoCtx(func(ctx context.Context) error {
+ sink.runBatchingWorker(ctx)
+ return nil
+ })
+ return sink
+}
diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go
index fde56353bfa3..76032968ded5 100644
--- a/pkg/ccl/changefeedccl/changefeed_processors.go
+++ b/pkg/ccl/changefeedccl/changefeed_processors.go
@@ -564,12 +564,19 @@ func (ca *changeAggregator) tick() error {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
- return ca.sink.Flush(ca.Ctx())
+ return ca.flushBufferedEvents()
}
return nil
}
+func (ca *changeAggregator) flushBufferedEvents() error {
+ if err := ca.eventConsumer.Flush(ca.Ctx()); err != nil {
+ return err
+ }
+ return ca.sink.Flush(ca.Ctx())
+}
+
// noteResolvedSpan periodically flushes Frontier progress from the current
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
@@ -621,7 +628,7 @@ func (ca *changeAggregator) flushFrontier() error {
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
- if err := ca.sink.Flush(ca.Ctx()); err != nil {
+ if err := ca.flushBufferedEvents(); err != nil {
return err
}
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index feb6d73e089d..70687259f535 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -4877,17 +4877,17 @@ func TestChangefeedErrors(t *testing.T) {
`webhook-https://fake-host`,
)
sqlDB.ExpectErr(
- t, `invalid option value webhook_sink_config, all config values must be non-negative`,
+ t, `invalid sink config, all values must be non-negative`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": -100, "Frequency": "1s"}}'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErr(
- t, `invalid option value webhook_sink_config, all config values must be non-negative`,
+ t, `invalid sink config, all values must be non-negative`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": 100, "Frequency": "-1s"}}'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErr(
- t, `invalid option value webhook_sink_config, flush frequency is not set, messages may never be sent`,
+ t, `invalid sink config, Flush.Frequency is not set, messages may never be sent`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": 100}}'`,
`webhook-https://fake-host`,
)
@@ -4907,17 +4907,17 @@ func TestChangefeedErrors(t *testing.T) {
`webhook-https://fake-host`,
)
sqlDB.ExpectErr(
- t, `max retries must be either a positive int or 'inf' for infinite retries.`,
+ t, `Retry.Max must be either a positive int or 'inf' for infinite retries.`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": "not valid"}}'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErr(
- t, `max retry count must be a positive integer. use 'inf' for infinite retries.`,
+ t, `Retry.Max must be a positive integer. use 'inf' for infinite retries.`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": 0}}'`,
`webhook-https://fake-host`,
)
sqlDB.ExpectErr(
- t, `max retry count must be a positive integer. use 'inf' for infinite retries.`,
+ t, `Retry.Max must be a positive integer. use 'inf' for infinite retries.`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": -1}}'`,
`webhook-https://fake-host`,
)
diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go
index abb8292b3867..9bd743c06ee2 100644
--- a/pkg/ccl/changefeedccl/changefeedbase/settings.go
+++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go
@@ -262,9 +262,9 @@ var EventConsumerPacerRequestSize = settings.RegisterDurationSetting(
settings.PositiveDuration,
)
-// EventConsumerElasticCPUControlEnabled determines whether changefeed event
+// PerEventElasticCPUControlEnabled determines whether changefeed event
// processing integrates with elastic CPU control.
-var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting(
+var PerEventElasticCPUControlEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.cpu.per_event_elastic_control.enabled",
"determines whether changefeed event processing integrates with elastic CPU control",
@@ -281,3 +281,27 @@ var RequireExternalConnectionSink = settings.RegisterBoolSetting(
" see https://www.cockroachlabs.com/docs/stable/create-external-connection.html",
false,
)
+
+// SinkIOWorkers controls the number of IO workers used by sinks that use
+// parallelIO to be able to send multiple requests in parallel.
+var SinkIOWorkers = settings.RegisterIntSetting(
+ settings.TenantWritable,
+ "changefeed.sink_io_workers",
+ "the number of workers used by changefeeds when sending requests to the sink "+
+ "(currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.",
+ 0,
+).WithPublic()
+
+// SinkPacerRequestSize specifies how often (measured in CPU time)
+// that the Sink batching worker request CPU time from admission control. For
+// example, every N milliseconds of CPU work, request N more milliseconds of CPU
+// time.
+var SinkPacerRequestSize = settings.RegisterDurationSetting(
+ settings.TenantWritable,
+ "changefeed.cpu.sink_encoding_allocation",
+ "an event consumer worker will perform a blocking request for CPU time "+
+ "before consuming events. after fully utilizing this CPU time, it will "+
+ "request more",
+ 50*time.Millisecond,
+ settings.PositiveDuration,
+)
diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go
index 2861515fe03e..759e6a207f77 100644
--- a/pkg/ccl/changefeedccl/event_processing.go
+++ b/pkg/ccl/changefeedccl/event_processing.go
@@ -104,7 +104,7 @@ func newEventConsumer(
}
pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV)
- enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV)
+ enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV)
makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) {
var err error
@@ -182,7 +182,10 @@ func newEventConsumer(
workerChSize: changefeedbase.EventConsumerWorkerQueueSize.Get(&cfg.Settings.SV),
spanFrontier: spanFrontier,
}
- ss := &safeSink{wrapped: sink, beforeFlush: c.Flush}
+ ss := sink
+ if !sinkSupportsConcurrentEmits(sink) {
+ ss = &safeSink{wrapped: sink}
+ }
c.makeConsumer = func() (eventConsumer, error) {
return makeConsumer(ss, c)
}
diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go
index 209eb5bc9291..53ee7c488705 100644
--- a/pkg/ccl/changefeedccl/helpers_test.go
+++ b/pkg/ccl/changefeedccl/helpers_test.go
@@ -853,6 +853,12 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
for _, weight := range sinkWeights {
weightTotal += weight
}
+ if weightTotal == 0 {
+ // This exists for testing purposes, where one may want to run all tests on
+ // the same sink and set sinkWeights to be 1 only for that sink, but some
+ // tests explicitly disallow that sink and therefore have no valid sinks.
+ return "skip"
+ }
p := rand.Float32() * float32(weightTotal)
var sum float32 = 0
for sink, weight := range sinkWeights {
@@ -861,7 +867,7 @@ func randomSinkTypeWithOptions(options feedTestOptions) string {
return sink
}
}
- return "kafka" // unreachable
+ return "skip" // unreachable
}
// addCloudStorageOptions adds the options necessary to enable a server to run a
@@ -967,6 +973,9 @@ func cdcTestNamedWithSystem(
cleanupCloudStorage := addCloudStorageOptions(t, &options)
sinkType := randomSinkTypeWithOptions(options)
+ if sinkType == "skip" {
+ return
+ }
testLabel := sinkType
if name != "" {
testLabel = fmt.Sprintf("%s/%s", sinkType, name)
diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go
index cfe92a54a13a..4d9539ebe213 100644
--- a/pkg/ccl/changefeedccl/metrics.go
+++ b/pkg/ccl/changefeedccl/metrics.go
@@ -32,6 +32,7 @@ const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedBatchHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
+ changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
)
@@ -55,6 +56,8 @@ type AggMetrics struct {
Flushes *aggmetric.AggCounter
FlushHistNanos *aggmetric.AggHistogram
SizeBasedFlushes *aggmetric.AggCounter
+ ParallelIOQueueNanos *aggmetric.AggHistogram
+ SinkIOInflight *aggmetric.AggGauge
CommitLatency *aggmetric.AggHistogram
BackfillCount *aggmetric.AggGauge
BackfillPendingRanges *aggmetric.AggGauge
@@ -92,6 +95,8 @@ type metricsRecorder interface {
getBackfillCallback() func() func()
getBackfillRangeCallback() func(int64) (func(), func())
recordSizeBasedFlush()
+ recordParallelIOQueueLatency(time.Duration)
+ recordSinkIOInflightChange(int64)
}
var _ metricsRecorder = (*sliMetrics)(nil)
@@ -111,6 +116,8 @@ type sliMetrics struct {
Flushes *aggmetric.Counter
FlushHistNanos *aggmetric.Histogram
SizeBasedFlushes *aggmetric.Counter
+ ParallelIOQueueNanos *aggmetric.Histogram
+ SinkIOInflight *aggmetric.Gauge
CommitLatency *aggmetric.Histogram
ErrorRetries *aggmetric.Counter
AdmitLatency *aggmetric.Histogram
@@ -244,6 +251,20 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}
+func (m *sliMetrics) recordParallelIOQueueLatency(latency time.Duration) {
+ if m == nil {
+ return
+ }
+ m.ParallelIOQueueNanos.RecordValue(latency.Nanoseconds())
+}
+func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
+ if m == nil {
+ return
+ }
+
+ m.SinkIOInflight.Inc(delta)
+}
+
type wrappingCostController struct {
ctx context.Context
inner metricsRecorder
@@ -314,6 +335,14 @@ func (w *wrappingCostController) recordSizeBasedFlush() {
w.inner.recordSizeBasedFlush()
}
+func (w *wrappingCostController) recordParallelIOQueueLatency(latency time.Duration) {
+ w.inner.recordParallelIOQueueLatency(latency)
+}
+
+func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) {
+ w.inner.recordSinkIOInflightChange(delta)
+}
+
var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
@@ -510,6 +539,18 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Registrations",
Unit: metric.Unit_COUNT,
}
+ metaChangefeedParallelIOQueueNanos := metric.Metadata{
+ Name: "changefeed.parallel_io_queue_nanos",
+ Help: "Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys",
+ Measurement: "Changefeeds",
+ Unit: metric.Unit_NANOSECONDS,
+ }
+ metaChangefeedSinkIOInflight := metric.Metadata{
+ Name: "changefeed.sink_io_inflight",
+ Help: "The number of keys currently inflight as IO requests being sent to the sink",
+ Measurement: "Messages",
+ Unit: metric.Unit_COUNT,
+ }
// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
@@ -528,6 +569,14 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
SizeBasedFlushes: b.Counter(metaSizeBasedFlushes),
+ ParallelIOQueueNanos: b.Histogram(metric.HistogramOptions{
+ Metadata: metaChangefeedParallelIOQueueNanos,
+ Duration: histogramWindow,
+ MaxVal: changefeedIOQueueMaxLatency.Nanoseconds(),
+ SigFigs: 2,
+ Buckets: metric.BatchProcessLatencyBuckets,
+ }),
+ SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight),
BatchHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedBatchHistNanos,
@@ -611,6 +660,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
Flushes: a.Flushes.AddChild(scope),
FlushHistNanos: a.FlushHistNanos.AddChild(scope),
SizeBasedFlushes: a.SizeBasedFlushes.AddChild(scope),
+ ParallelIOQueueNanos: a.ParallelIOQueueNanos.AddChild(scope),
+ SinkIOInflight: a.SinkIOInflight.AddChild(scope),
CommitLatency: a.CommitLatency.AddChild(scope),
ErrorRetries: a.ErrorRetries.AddChild(scope),
AdmitLatency: a.AdmitLatency.AddChild(scope),
diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go
new file mode 100644
index 000000000000..b986da71af21
--- /dev/null
+++ b/pkg/ccl/changefeedccl/parallel_io.go
@@ -0,0 +1,282 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Licensed as a CockroachDB Enterprise file under the Cockroach Community
+// License (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
+
+package changefeedccl
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
+ "github.com/cockroachdb/cockroach/pkg/util/intsets"
+ "github.com/cockroachdb/cockroach/pkg/util/retry"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+)
+
+// parallelIO allows performing blocking "IOHandler" calls on in parallel.
+// IORequests implement a Keys() function returning keys on which ordering is
+// preserved.
+// Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that
+// order, [a,b] and [e,f] can be emitted concurrentyl while [b,c] will block
+// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d]
+// errored, [b,c] would never be sent, and an error would be returned with [c,d]
+// in an ioResult struct sent to resultCh. After sending an error to resultCh
+// all workers are torn down and no further requests are received or handled.
+type parallelIO struct {
+ retryOpts retry.Options
+ wg ctxgroup.Group
+ metrics metricsRecorder
+ doneCh chan struct{}
+
+ ioHandler IOHandler
+
+ requestCh chan IORequest
+ resultCh chan *ioResult // readers should freeIOResult after handling result events
+}
+
+// IORequest represents an abstract unit of IO that has a set of keys upon which
+// sequential ordering of fulfillment must be enforced.
+type IORequest interface {
+ Keys() intsets.Fast
+}
+
+// ioResult stores the full request that was sent as well as an error if even
+// after retries the IOHanlder was unable to succeed.
+type ioResult struct {
+ request IORequest
+ err error
+}
+
+var resultPool sync.Pool = sync.Pool{
+ New: func() interface{} {
+ return new(ioResult)
+ },
+}
+
+func newIOResult(req IORequest, err error) *ioResult {
+ res := resultPool.Get().(*ioResult)
+ res.request = req
+ res.err = err
+ return res
+}
+func freeIOResult(e *ioResult) {
+ *e = ioResult{}
+ resultPool.Put(e)
+}
+
+type queuedRequest struct {
+ req IORequest
+ admitTime time.Time
+}
+
+// IOHandler performs a blocking IO operation on an IORequest
+type IOHandler func(context.Context, IORequest) error
+
+func newParallelIO(
+ ctx context.Context,
+ retryOpts retry.Options,
+ numWorkers int,
+ handler IOHandler,
+ metrics metricsRecorder,
+) *parallelIO {
+ wg := ctxgroup.WithContext(ctx)
+ io := ¶llelIO{
+ retryOpts: retryOpts,
+ wg: wg,
+ metrics: metrics,
+ ioHandler: handler,
+ requestCh: make(chan IORequest, numWorkers),
+ resultCh: make(chan *ioResult, numWorkers),
+ doneCh: make(chan struct{}),
+ }
+
+ wg.GoCtx(func(ctx context.Context) error {
+ return io.processIO(ctx, numWorkers)
+ })
+
+ return io
+}
+
+// Close stops all workers immediately and returns once they shut down. Inflight
+// requests sent to requestCh may never result in being sent to resultCh.
+func (p *parallelIO) Close() {
+ close(p.doneCh)
+ _ = p.wg.Wait()
+}
+
+// processIO starts numEmitWorkers worker threads to run the IOHandler on
+// non-conflicting IORequests each retrying according to the retryOpts, then:
+// - Reads incoming messages from requestCh, sending them to any worker if there
+// aren't any conflicting messages and queing them up to be sent later
+// otherwise.
+// - Reads results from the workers and forwards the information to resultCh, at
+// this point also sending the first pending request that would now be sendable.
+//
+// ┌───────────┐
+// ┌►│io worker 1├──┐
+// │ └───────────┘ │
+// <-requestCh──────────┤► ... ├─┬─► resultCh<-
+// │ ▲ │ ┌───────────┐ │ │
+// │conflict │ └►│io worker n├──┘ │
+// │ │ └───────────┘ │
+// ▼ │ │check pending
+// ┌─────────┐ │no conflict │
+// │ pending ├───────┘ │
+// └─────────┘ │
+// ▲ │
+// └──────────────────────────────────┘
+//
+// The conflict checking is done via an intset.Fast storing the union of all
+// keys currently being sent, followed by checking each pending batch's intset.
+func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error {
+ emitWithRetries := func(ctx context.Context, payload IORequest) error {
+ initialSend := true
+ return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error {
+ if !initialSend {
+ p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false)
+ }
+ initialSend = false
+ return p.ioHandler(ctx, payload)
+ })
+ }
+
+ // Multiple worker routines handle the IO operations, retrying when necessary.
+ workerEmitCh := make(chan IORequest, numEmitWorkers)
+ defer close(workerEmitCh)
+ workerResultCh := make(chan *ioResult, numEmitWorkers)
+
+ for i := 0; i < numEmitWorkers; i++ {
+ p.wg.GoCtx(func(ctx context.Context) error {
+ for req := range workerEmitCh {
+ result := newIOResult(req, emitWithRetries(ctx, req))
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ case workerResultCh <- result:
+ }
+ }
+ return nil
+ })
+ }
+
+ // submitIO, which sends requests to the workers, has to select on both
+ // workerEmitCh<- and <-workerResultCh, since without the <-workerResultCh
+ // there could be a deadlock where all workers are blocked on emitting results
+ // and thereby unable to accept new work. If a result is received, it also
+ // cannot immediately call handleResult on it as that could cause a re-entrant
+ // submitIO -> handleResult -> submitIO -> handleResult chain which is complex
+ // to manage. To avoid this, results are added to a pending list to be handled
+ // separately.
+ var pendingResults []*ioResult
+ submitIO := func(req IORequest) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ case workerEmitCh <- req:
+ return nil
+ case res := <-workerResultCh:
+ pendingResults = append(pendingResults, res)
+ }
+ }
+ }
+
+ // The main routine keeps track of incoming and completed requests, where
+ // admitted requests yet to be completed have their Keys() tracked in an
+ // intset, and any incoming request with keys already in the intset are placed
+ // in a Queue to be sent to IO workers once the conflicting requests complete.
+ var inflight intsets.Fast
+ var pending []queuedRequest
+
+ handleResult := func(res *ioResult) error {
+ if res.err == nil {
+ // Clear out the completed keys to check for newly valid pending requests.
+ inflight.DifferenceWith(res.request.Keys())
+
+ // Check for a pending request that is now able to be sent i.e. is not
+ // conflicting with any inflight requests or any requests that arrived
+ // earlier than itself in the pending queue.
+ pendingKeys := intsets.Fast{}
+ for i, pendingReq := range pending {
+ if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) {
+ inflight.UnionWith(pendingReq.req.Keys())
+ pending = append(pending[:i], pending[i+1:]...)
+ p.metrics.recordParallelIOQueueLatency(timeutil.Since(pendingReq.admitTime))
+ if err := submitIO(pendingReq.req); err != nil {
+ return err
+ }
+ break
+ }
+
+ pendingKeys.UnionWith(pendingReq.req.Keys())
+ }
+ }
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ case p.resultCh <- res:
+ return nil
+ }
+ }
+
+ // If any yet-to-be-handled request observed so far shares any keys with an
+ // incoming request, the incoming request cannot be sent out and risk arriving
+ // earlier.
+ hasConflictingKeys := func(keys intsets.Fast) bool {
+ if inflight.Intersects(keys) {
+ return true
+ }
+ for _, pendingReq := range pending {
+ if pendingReq.req.Keys().Intersects(keys) {
+ return true
+ }
+ }
+ return false
+ }
+
+ for {
+ // Handle any results that arrived during any submitIO attempts
+ unhandled := pendingResults
+ pendingResults = nil
+ for _, res := range unhandled {
+ if err := handleResult(res); err != nil {
+ return err
+ }
+ }
+
+ select {
+ case req := <-p.requestCh:
+ if hasConflictingKeys(req.Keys()) {
+ // If a request conflicts with any currently unhandled requests, add it
+ // to the pending queue to be rechecked for validity later.
+ pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()})
+ } else {
+ inflight.UnionWith(req.Keys())
+ if err := submitIO(req); err != nil {
+ return err
+ }
+ }
+ case res := <-workerResultCh:
+ if err := handleResult(res); err != nil {
+ return err
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-p.doneCh:
+ return nil
+ }
+ }
+}
diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go
index 70eff852d348..8309d30f27d2 100644
--- a/pkg/ccl/changefeedccl/sink.go
+++ b/pkg/ccl/changefeedccl/sink.go
@@ -10,7 +10,11 @@ package changefeedccl
import (
"context"
+ "encoding/json"
+ "math"
"net/url"
+ "runtime"
+ "strconv"
"strings"
"time"
@@ -19,13 +23,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
+ "github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/util"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
@@ -146,6 +156,16 @@ func getAndDialSink(
return sink, sink.Dial()
}
+// NewWebhookSinkEnabled determines whether or not the refactored Webhook sink
+// or the deprecated sink should be used.
+var NewWebhookSinkEnabled = settings.RegisterBoolSetting(
+ settings.TenantWritable,
+ "changefeed.new_webhook_sink_enabled",
+ "if enabled, this setting enables a new implementation of the webhook sink"+
+ " that allows for a much higher throughput",
+ util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),
+)
+
func getSink(
ctx context.Context,
serverCfg *execinfra.ServerConfig,
@@ -207,10 +227,17 @@ func getSink(
if err != nil {
return nil, err
}
- return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
- return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
- defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder)
- })
+ if NewWebhookSinkEnabled.Get(&serverCfg.Settings.SV) {
+ return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
+ return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
+ numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder)
+ })
+ } else {
+ return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
+ return makeDeprecatedWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
+ defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder)
+ })
+ }
case isPubsubSink(u):
// TODO: add metrics to pubsubsink
return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered))
@@ -606,12 +633,10 @@ func (n *nullSink) Dial() error {
}
// safeSink wraps an EventSink in a mutex so it's methods are
-// thread safe. It also has a beforeFlush hook which is called
-// at the beginning of safeSink.Flush().
+// thread safe.
type safeSink struct {
syncutil.Mutex
- beforeFlush func(ctx context.Context) error
- wrapped EventSink
+ wrapped EventSink
}
var _ EventSink = (*safeSink)(nil)
@@ -645,9 +670,6 @@ func (s *safeSink) EmitRow(
}
func (s *safeSink) Flush(ctx context.Context) error {
- if err := s.beforeFlush(ctx); err != nil {
- return err
- }
s.Lock()
defer s.Unlock()
return s.wrapped.Flush(ctx)
@@ -673,3 +695,166 @@ type SinkWithEncoder interface {
Flush(ctx context.Context) error
}
+
+// proper JSON schema for sink config:
+//
+// {
+// "Flush": {
+// "Messages": ...,
+// "Bytes": ...,
+// "Frequency": ...,
+// },
+// "Retry": {
+// "Max": ...,
+// "Backoff": ...,
+// }
+// }
+type sinkJSONConfig struct {
+ Flush sinkBatchConfig `json:",omitempty"`
+ Retry sinkRetryConfig `json:",omitempty"`
+}
+
+type sinkBatchConfig struct {
+ Bytes, Messages int `json:",omitempty"`
+ Frequency jsonDuration `json:",omitempty"`
+}
+
+// wrapper structs to unmarshal json, retry.Options will be the actual config
+type sinkRetryConfig struct {
+ Max jsonMaxRetries `json:",omitempty"`
+ Backoff jsonDuration `json:",omitempty"`
+}
+
+func defaultRetryConfig() retry.Options {
+ opts := retry.Options{
+ InitialBackoff: 500 * time.Millisecond,
+ MaxRetries: 3,
+ Multiplier: 2,
+ }
+ // max backoff should be initial * 2 ^ maxRetries
+ opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries))))
+ return opts
+}
+
+func getSinkConfigFromJson(
+ jsonStr changefeedbase.SinkSpecificJSONConfig,
+) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) {
+ retryCfg = defaultRetryConfig()
+
+ var cfg = sinkJSONConfig{}
+ cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries)
+ cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff)
+ if jsonStr != `` {
+ // set retry defaults to be overridden if included in JSON
+ if err = json.Unmarshal([]byte(jsonStr), &cfg); err != nil {
+ return batchCfg, retryCfg, errors.Wrapf(err, "error unmarshalling json")
+ }
+ }
+
+ // don't support negative values
+ if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 ||
+ cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 {
+ return batchCfg, retryCfg, errors.Errorf("invalid sink config, all values must be non-negative")
+ }
+
+ // errors if other batch values are set, but frequency is not
+ if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 {
+ return batchCfg, retryCfg, errors.Errorf("invalid sink config, Flush.Frequency is not set, messages may never be sent")
+ }
+
+ retryCfg.MaxRetries = int(cfg.Retry.Max)
+ retryCfg.InitialBackoff = time.Duration(cfg.Retry.Backoff)
+ return cfg.Flush, retryCfg, nil
+}
+
+type jsonMaxRetries int
+
+func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error {
+ var i int64
+ // try to parse as int
+ i, err := strconv.ParseInt(string(b), 10, 64)
+ if err == nil {
+ if i <= 0 {
+ return errors.Errorf("Retry.Max must be a positive integer. use 'inf' for infinite retries.")
+ }
+ *j = jsonMaxRetries(i)
+ } else {
+ // if that fails, try to parse as string (only accept 'inf')
+ var s string
+ // using unmarshal here to remove quotes around the string
+ if err := json.Unmarshal(b, &s); err != nil {
+ return errors.Errorf("Retry.Max must be either a positive int or 'inf' for infinite retries.")
+ }
+ if strings.ToLower(s) == "inf" {
+ // if used wants infinite retries, set to zero as retry.Options interprets this as infinity
+ *j = 0
+ } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings
+ *j = jsonMaxRetries(n)
+ } else {
+ return errors.Errorf("Retry.Max must be either a positive int or 'inf' for infinite retries.")
+ }
+ }
+ return nil
+}
+
+func numSinkIOWorkers(cfg *execinfra.ServerConfig) int {
+ numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV)
+ if numWorkers > 0 {
+ return int(numWorkers)
+ }
+
+ idealNumber := runtime.GOMAXPROCS(0)
+ if idealNumber < 1 {
+ return 1
+ }
+ if idealNumber > 32 {
+ return 32
+ }
+ return idealNumber
+}
+
+func newCPUPacerFactory(ctx context.Context, cfg *execinfra.ServerConfig) func() *admission.Pacer {
+ var prevPacer *admission.Pacer
+ var prevRequestUnit time.Duration
+
+ return func() *admission.Pacer {
+ pacerRequestUnit := changefeedbase.SinkPacerRequestSize.Get(&cfg.Settings.SV)
+ enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV)
+
+ if enablePacer && prevPacer != nil && prevRequestUnit == pacerRequestUnit {
+ return prevPacer
+ }
+
+ var pacer *admission.Pacer = nil
+ if enablePacer {
+ tenantID, ok := roachpb.ClientTenantFromContext(ctx)
+ if !ok {
+ tenantID = roachpb.SystemTenantID
+ }
+
+ pacer = cfg.AdmissionPacerFactory.NewPacer(
+ pacerRequestUnit,
+ admission.WorkInfo{
+ TenantID: tenantID,
+ Priority: admissionpb.BulkNormalPri,
+ CreateTime: timeutil.Now().UnixNano(),
+ BypassAdmission: false,
+ },
+ )
+ }
+
+ prevPacer = pacer
+ prevRequestUnit = pacerRequestUnit
+
+ return pacer
+ }
+}
+
+func nilPacerFactory() *admission.Pacer {
+ return nil
+}
+
+func sinkSupportsConcurrentEmits(sink EventSink) bool {
+ _, ok := sink.(*batchingSink)
+ return ok
+}
diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go
index eb663f021efc..25ae999c29ea 100644
--- a/pkg/ccl/changefeedccl/sink_test.go
+++ b/pkg/ccl/changefeedccl/sink_test.go
@@ -764,3 +764,86 @@ func TestKafkaSinkTracksMemory(t *testing.T) {
require.EqualValues(t, 0, p.outstanding())
require.EqualValues(t, 0, pool.used())
}
+
+func TestSinkConfigParsing(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ t.Run("handles valid types", func(t *testing.T) {
+ opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":30}, "Retry": {"Max": 5, "Backoff": "3h"}}`)
+ batch, retry, err := getSinkConfigFromJson(opts)
+ require.NoError(t, err)
+ require.Equal(t, batch, sinkBatchConfig{
+ Bytes: 30,
+ Messages: 1234,
+ Frequency: jsonDuration(3 * time.Second),
+ })
+ require.Equal(t, retry.MaxRetries, 5)
+ require.Equal(t, retry.InitialBackoff, 3*time.Hour)
+
+ // Max accepts both values and specifically the string "inf"
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`)
+ _, retry, err = getSinkConfigFromJson(opts)
+ require.NoError(t, err)
+ require.Equal(t, retry.MaxRetries, 0)
+ })
+
+ t.Run("provides retry defaults", func(t *testing.T) {
+ defaultRetry := defaultRetryConfig()
+
+ opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s"}}`)
+ _, retry, err := getSinkConfigFromJson(opts)
+ require.NoError(t, err)
+
+ require.Equal(t, retry.MaxRetries, defaultRetry.MaxRetries)
+ require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff)
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`)
+ _, retry, err = getSinkConfigFromJson(opts)
+ require.NoError(t, err)
+ require.Equal(t, retry.MaxRetries, 0)
+ require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff)
+ })
+
+ t.Run("errors on invalid configuration", func(t *testing.T) {
+ opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": -1234, "Frequency": "3s"}}`)
+ _, _, err := getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "invalid sink config, all values must be non-negative")
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "-3s"}}`)
+ _, _, err = getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "invalid sink config, all values must be non-negative")
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 10}}`)
+ _, _, err = getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "invalid sink config, Flush.Frequency is not set, messages may never be sent")
+ })
+
+ t.Run("errors on invalid type", func(t *testing.T) {
+ opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": "1234", "Frequency": "3s", "Bytes":30}}`)
+ _, _, err := getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "Flush.Messages of type int")
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":"30"}}`)
+ _, _, err = getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "Flush.Bytes of type int")
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": true}}`)
+ _, _, err = getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'")
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "not-inf"}}`)
+ _, _, err = getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'")
+ })
+
+ t.Run("errors on malformed json", func(t *testing.T) {
+ opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234 "Frequency": "3s"}}`)
+ _, _, err := getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "invalid character '\"'")
+
+ opts = changefeedbase.SinkSpecificJSONConfig(`string`)
+ _, _, err = getSinkConfigFromJson(opts)
+ require.ErrorContains(t, err, "invalid character 's' looking for beginning of value")
+ })
+}
diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go
index 09fd6cb508a2..f2ca73d125de 100644
--- a/pkg/ccl/changefeedccl/sink_webhook.go
+++ b/pkg/ccl/changefeedccl/sink_webhook.go
@@ -17,11 +17,9 @@ import (
"fmt"
"hash/crc32"
"io"
- "math"
"net"
"net/http"
"net/url"
- "strconv"
"strings"
"time"
@@ -36,23 +34,7 @@ import (
"github.com/cockroachdb/errors"
)
-const (
- applicationTypeJSON = `application/json`
- applicationTypeCSV = `text/csv`
- authorizationHeader = `Authorization`
-)
-
-func isWebhookSink(u *url.URL) bool {
- switch u.Scheme {
- // allow HTTP here but throw an error later to make it clear HTTPS is required
- case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS:
- return true
- default:
- return false
- }
-}
-
-type webhookSink struct {
+type deprecatedWebhookSink struct {
// Webhook configuration.
parallelism int
retryCfg retry.Options
@@ -80,11 +62,11 @@ type webhookSink struct {
workerCtx context.Context
workerGroup ctxgroup.Group
exitWorkers func() // Signaled to shut down all workers.
- eventsChans []chan []messagePayload
+ eventsChans []chan []deprecatedMessagePayload
metrics metricsRecorder
}
-func (s *webhookSink) getConcreteType() sinkType {
+func (s *deprecatedWebhookSink) getConcreteType() sinkType {
return sinkTypeWebhook
}
@@ -100,7 +82,7 @@ type encodedPayload struct {
mvcc hlc.Timestamp
}
-func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) {
+func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) {
result := encodedPayload{
emitTime: timeutil.Now(),
}
@@ -129,7 +111,7 @@ func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error)
return result, err
}
-func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) {
+func encodePayloadCSVWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) {
result := encodedPayload{
emitTime: timeutil.Now(),
}
@@ -150,7 +132,7 @@ func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error)
return result, nil
}
-type messagePayload struct {
+type deprecatedMessagePayload struct {
// Payload message fields.
key []byte
val []byte
@@ -162,15 +144,15 @@ type messagePayload struct {
// webhookMessage contains either messagePayload or a flush request.
type webhookMessage struct {
flushDone *chan struct{}
- payload messagePayload
+ payload deprecatedMessagePayload
}
type batch struct {
- buffer []messagePayload
+ buffer []deprecatedMessagePayload
bufferBytes int
}
-func (b *batch) addToBuffer(m messagePayload) {
+func (b *batch) addToBuffer(m deprecatedMessagePayload) {
b.bufferBytes += len(m.val)
b.buffer = append(b.buffer, m)
}
@@ -185,36 +167,6 @@ type batchConfig struct {
Frequency jsonDuration `json:",omitempty"`
}
-type jsonMaxRetries int
-
-func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error {
- var i int64
- // try to parse as int
- i, err := strconv.ParseInt(string(b), 10, 64)
- if err == nil {
- if i <= 0 {
- return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.")
- }
- *j = jsonMaxRetries(i)
- } else {
- // if that fails, try to parse as string (only accept 'inf')
- var s string
- // using unmarshal here to remove quotes around the string
- if err := json.Unmarshal(b, &s); err != nil {
- return err
- }
- if strings.ToLower(s) == "inf" {
- // if used wants infinite retries, set to zero as retry.Options interprets this as infinity
- *j = 0
- } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings
- *j = jsonMaxRetries(n)
- } else {
- return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.")
- }
- }
- return nil
-}
-
// wrapper structs to unmarshal json, retry.Options will be the actual config
type retryConfig struct {
Max jsonMaxRetries `json:",omitempty"`
@@ -239,7 +191,7 @@ type webhookSinkConfig struct {
Retry retryConfig `json:",omitempty"`
}
-func (s *webhookSink) getWebhookSinkConfig(
+func (s *deprecatedWebhookSink) getWebhookSinkConfig(
jsonStr changefeedbase.SinkSpecificJSONConfig,
) (batchCfg batchConfig, retryCfg retry.Options, err error) {
retryCfg = defaultRetryConfig()
@@ -257,12 +209,12 @@ func (s *webhookSink) getWebhookSinkConfig(
// don't support negative values
if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 ||
cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 {
- return batchCfg, retryCfg, errors.Errorf("invalid option value %s, all config values must be non-negative", changefeedbase.OptWebhookSinkConfig)
+ return batchCfg, retryCfg, errors.Errorf("invalid sink config, all values must be non-negative")
}
// errors if other batch values are set, but frequency is not
if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 {
- return batchCfg, retryCfg, errors.Errorf("invalid option value %s, flush frequency is not set, messages may never be sent", changefeedbase.OptWebhookSinkConfig)
+ return batchCfg, retryCfg, errors.Errorf("invalid sink config, Flush.Frequency is not set, messages may never be sent")
}
retryCfg.MaxRetries = int(cfg.Retry.Max)
@@ -270,7 +222,7 @@ func (s *webhookSink) getWebhookSinkConfig(
return cfg.Flush, retryCfg, nil
}
-func makeWebhookSink(
+func makeDeprecatedWebhookSink(
ctx context.Context,
u sinkURL,
encodingOpts changefeedbase.EncodingOptions,
@@ -312,7 +264,7 @@ func makeWebhookSink(
ctx, cancel := context.WithCancel(ctx)
- sink := &webhookSink{
+ sink := &deprecatedWebhookSink{
workerCtx: ctx,
authHeader: opts.AuthHeader,
exitWorkers: cancel,
@@ -329,7 +281,7 @@ func makeWebhookSink(
}
// TODO(yevgeniy): Establish HTTP connection in Dial().
- sink.client, err = makeWebhookClient(u, connTimeout)
+ sink.client, err = deprecatedMakeWebhookClient(u, connTimeout)
if err != nil {
return nil, err
}
@@ -350,7 +302,7 @@ func makeWebhookSink(
return sink, nil
}
-func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) {
+func deprecatedMakeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) {
client := &httputil.Client{
Client: &http.Client{
Timeout: timeout,
@@ -417,30 +369,19 @@ func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, erro
return client, nil
}
-func defaultRetryConfig() retry.Options {
- opts := retry.Options{
- InitialBackoff: 500 * time.Millisecond,
- MaxRetries: 3,
- Multiplier: 2,
- }
- // max backoff should be initial * 2 ^ maxRetries
- opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries))))
- return opts
-}
-
// defaultWorkerCount() is the number of CPU's on the machine
func defaultWorkerCount() int {
return system.NumCPU()
}
-func (s *webhookSink) Dial() error {
+func (s *deprecatedWebhookSink) Dial() error {
s.setupWorkers()
return nil
}
-func (s *webhookSink) setupWorkers() {
+func (s *deprecatedWebhookSink) setupWorkers() {
// setup events channels to send to workers and the worker group
- s.eventsChans = make([]chan []messagePayload, s.parallelism)
+ s.eventsChans = make([]chan []deprecatedMessagePayload, s.parallelism)
s.workerGroup = ctxgroup.WithContext(s.workerCtx)
s.batchChan = make(chan webhookMessage)
@@ -455,7 +396,7 @@ func (s *webhookSink) setupWorkers() {
return nil
})
for i := 0; i < s.parallelism; i++ {
- s.eventsChans[i] = make(chan []messagePayload)
+ s.eventsChans[i] = make(chan []deprecatedMessagePayload)
j := i
s.workerGroup.GoCtx(func(ctx context.Context) error {
s.workerLoop(j)
@@ -464,7 +405,7 @@ func (s *webhookSink) setupWorkers() {
}
}
-func (s *webhookSink) shouldSendBatch(b batch) bool {
+func (s *deprecatedWebhookSink) shouldSendBatch(b batch) bool {
// similar to sarama, send batch if:
// everything is zero (default)
// any one of the conditions are met UNLESS the condition is zero which means never batch
@@ -483,8 +424,8 @@ func (s *webhookSink) shouldSendBatch(b batch) bool {
}
}
-func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error {
- workerBatches := make([][]messagePayload, s.parallelism)
+func (s *deprecatedWebhookSink) splitAndSendBatch(batch []deprecatedMessagePayload) error {
+ workerBatches := make([][]deprecatedMessagePayload, s.parallelism)
for _, msg := range batch {
// split batch into per-worker batches
i := s.workerIndex(msg.key)
@@ -504,7 +445,7 @@ func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error {
}
// flushWorkers sends flush request to each worker and waits for each one to acknowledge.
-func (s *webhookSink) flushWorkers(done chan struct{}) error {
+func (s *deprecatedWebhookSink) flushWorkers(done chan struct{}) error {
for i := 0; i < len(s.eventsChans); i++ {
// Ability to write a nil message to events channel indicates that
// the worker has processed all other messages.
@@ -525,7 +466,7 @@ func (s *webhookSink) flushWorkers(done chan struct{}) error {
// batchWorker ingests messages from EmitRow into a batch and splits them into
// per-worker batches to be sent separately
-func (s *webhookSink) batchWorker() {
+func (s *deprecatedWebhookSink) batchWorker() {
var batchTracker batch
batchTimer := s.ts.NewTimer()
defer batchTimer.Stop()
@@ -578,7 +519,7 @@ func (s *webhookSink) batchWorker() {
}
}
-func (s *webhookSink) workerLoop(workerIndex int) {
+func (s *deprecatedWebhookSink) workerLoop(workerIndex int) {
for {
select {
case <-s.workerCtx.Done():
@@ -613,14 +554,14 @@ func (s *webhookSink) workerLoop(workerIndex int) {
}
}
-func (s *webhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error {
+func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error {
requestFunc := func() error {
return s.sendMessage(ctx, reqBody)
}
return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc)
}
-func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
+func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody))
if err != nil {
return err
@@ -658,13 +599,13 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error {
// deterministically assigned to the same worker. Since we have a channel per
// worker, we can ensure per-worker ordering and therefore guarantee per-key
// ordering.
-func (s *webhookSink) workerIndex(key []byte) uint32 {
+func (s *deprecatedWebhookSink) workerIndex(key []byte) uint32 {
return crc32.ChecksumIEEE(key) % uint32(s.parallelism)
}
// exitWorkersWithError saves the first error message encountered by webhook workers,
// and requests all workers to terminate.
-func (s *webhookSink) exitWorkersWithError(err error) {
+func (s *deprecatedWebhookSink) exitWorkersWithError(err error) {
// errChan has buffer size 1, first error will be saved to the buffer and
// subsequent errors will be ignored
select {
@@ -675,7 +616,7 @@ func (s *webhookSink) exitWorkersWithError(err error) {
}
// sinkError checks to see if any errors occurred inside workers go routines.
-func (s *webhookSink) sinkError() error {
+func (s *deprecatedWebhookSink) sinkError() error {
select {
case err := <-s.errChan:
return err
@@ -684,7 +625,7 @@ func (s *webhookSink) sinkError() error {
}
}
-func (s *webhookSink) EmitRow(
+func (s *deprecatedWebhookSink) EmitRow(
ctx context.Context,
topic TopicDescriptor,
key, value []byte,
@@ -702,7 +643,7 @@ func (s *webhookSink) EmitRow(
case err := <-s.errChan:
return err
case s.batchChan <- webhookMessage{
- payload: messagePayload{
+ payload: deprecatedMessagePayload{
key: key,
val: value,
alloc: alloc,
@@ -714,7 +655,7 @@ func (s *webhookSink) EmitRow(
return nil
}
-func (s *webhookSink) EmitResolvedTimestamp(
+func (s *deprecatedWebhookSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
defer s.metrics.recordResolvedCallback()()
@@ -745,7 +686,7 @@ func (s *webhookSink) EmitResolvedTimestamp(
return nil
}
-func (s *webhookSink) Flush(ctx context.Context) error {
+func (s *deprecatedWebhookSink) Flush(ctx context.Context) error {
s.metrics.recordFlushRequestCallback()()
// Send flush request.
@@ -768,7 +709,7 @@ func (s *webhookSink) Flush(ctx context.Context) error {
}
}
-func (s *webhookSink) Close() error {
+func (s *deprecatedWebhookSink) Close() error {
s.exitWorkers()
// ignore errors here since we're closing the sink anyway
_ = s.workerGroup.Wait()
diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go
index 199734be6bf5..65356260c7e6 100644
--- a/pkg/ccl/changefeedccl/sink_webhook_test.go
+++ b/pkg/ccl/changefeedccl/sink_webhook_test.go
@@ -15,6 +15,7 @@ import (
"net/http"
"net/url"
"strings"
+ "sync/atomic"
"testing"
"time"
@@ -83,7 +84,7 @@ func setupWebhookSinkWithDetails(
if err != nil {
return nil, err
}
- sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, source, nilMetricsRecorderBuilder)
+ sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder)
if err != nil {
return nil, err
}
@@ -172,8 +173,6 @@ func TestWebhookSink(t *testing.T) {
require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background()))
- require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
params.Set(changefeedbase.SinkParamSkipTLSVerify, "true")
sinkDestHost.RawQuery = params.Encode()
@@ -191,8 +190,6 @@ func TestWebhookSink(t *testing.T) {
err = sinkSrc.Flush(context.Background())
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf(`Post "%s":`, sinkDest.URL()))
- require.EqualError(t, sinkSrc.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
sinkDestHTTP, err := cdctest.StartMockWebhookSinkInsecure()
require.NoError(t, err)
@@ -207,8 +204,6 @@ func TestWebhookSink(t *testing.T) {
require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()),
fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(),
"http://"))))
- require.EqualError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert)
require.NoError(t, err)
@@ -230,6 +225,7 @@ func TestWebhookSink(t *testing.T) {
Opts: opts,
}
+ require.NoError(t, sinkSrc.Close())
sinkSrc, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{})
require.NoError(t, err)
@@ -305,8 +301,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) {
require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ")
- require.EqualError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
// wrong credentials should result in a 401 as well
var wrongAuthHeader string
@@ -318,8 +312,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) {
require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc))
require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ")
- require.EqualError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc),
- `context canceled`)
require.NoError(t, sinkSrc.Close())
require.NoError(t, sinkSrcNoCreds.Close())
@@ -603,6 +595,13 @@ func TestWebhookSinkConfig(t *testing.T) {
sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt)
require.NoError(t, err)
+ batchingSink, ok := sinkSrc.(*batchingSink)
+ require.True(t, ok)
+ var appendCount int32 = 0
+ batchingSink.knobs.OnAppend = func(event *rowEvent) {
+ atomic.AddInt32(&appendCount, 1)
+ }
+
// send incomplete batch
require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc()))
require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc()))
@@ -611,11 +610,10 @@ func TestWebhookSinkConfig(t *testing.T) {
require.Equal(t, sinkDest.Latest(), "")
testutils.SucceedsSoon(t, func() error {
- // wait for the timer in batch worker to be set (1 hour from now, as specified by config) before advancing time.
- if len(mt.Timers()) == 1 && mt.Timers()[0] == mt.Now().Add(time.Hour) {
+ if atomic.LoadInt32(&appendCount) >= 2 {
return nil
}
- return errors.New("Waiting for timer to be created by batch worker")
+ return errors.New("Waiting for rows to be buffered")
})
mt.Advance(time.Hour)
require.NoError(t, sinkSrc.Flush(context.Background()))
diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go
new file mode 100644
index 000000000000..6403c67c7a48
--- /dev/null
+++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go
@@ -0,0 +1,381 @@
+// Copyright 2023 The Cockroach Authors.
+//
+// Licensed as a CockroachDB Enterprise file under the Cockroach Community
+// License (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
+
+package changefeedccl
+
+import (
+ "bytes"
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
+ "github.com/cockroachdb/cockroach/pkg/util/admission"
+ "github.com/cockroachdb/cockroach/pkg/util/httputil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+)
+
+const (
+ applicationTypeJSON = `application/json`
+ applicationTypeCSV = `text/csv`
+ authorizationHeader = `Authorization`
+)
+
+func isWebhookSink(u *url.URL) bool {
+ switch u.Scheme {
+ // allow HTTP here but throw an error later to make it clear HTTPS is required
+ case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS:
+ return true
+ default:
+ return false
+ }
+}
+
+type webhookSinkClient struct {
+ ctx context.Context
+ format changefeedbase.FormatType
+ url sinkURL
+ authHeader string
+ batchCfg sinkBatchConfig
+ client *httputil.Client
+}
+
+var _ SinkClient = (*webhookSinkClient)(nil)
+
+func makeWebhookSinkClient(
+ ctx context.Context,
+ u sinkURL,
+ encodingOpts changefeedbase.EncodingOptions,
+ opts changefeedbase.WebhookSinkOptions,
+ batchCfg sinkBatchConfig,
+ parallelism int,
+) (SinkClient, error) {
+ err := validateWebhookOpts(u, encodingOpts, opts)
+ if err != nil {
+ return nil, err
+ }
+
+ u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`)
+
+ sinkClient := &webhookSinkClient{
+ ctx: ctx,
+ authHeader: opts.AuthHeader,
+ format: encodingOpts.Format,
+ batchCfg: batchCfg,
+ }
+
+ var connTimeout time.Duration
+ if opts.ClientTimeout != nil {
+ connTimeout = *opts.ClientTimeout
+ }
+ sinkClient.client, err = makeWebhookClient(u, connTimeout, parallelism)
+ if err != nil {
+ return nil, err
+ }
+
+ // remove known query params from sink URL before setting in sink config
+ sinkURLParsed, err := url.Parse(u.String())
+ if err != nil {
+ return nil, err
+ }
+ params := sinkURLParsed.Query()
+ params.Del(changefeedbase.SinkParamSkipTLSVerify)
+ params.Del(changefeedbase.SinkParamCACert)
+ params.Del(changefeedbase.SinkParamClientCert)
+ params.Del(changefeedbase.SinkParamClientKey)
+ sinkURLParsed.RawQuery = params.Encode()
+ sinkClient.url = sinkURL{URL: sinkURLParsed}
+
+ return sinkClient, nil
+}
+
+func makeWebhookClient(
+ u sinkURL, timeout time.Duration, parallelism int,
+) (*httputil.Client, error) {
+ client := &httputil.Client{
+ Client: &http.Client{
+ Timeout: timeout,
+ Transport: &http.Transport{
+ DialContext: (&net.Dialer{Timeout: timeout}).DialContext,
+ MaxConnsPerHost: parallelism,
+ MaxIdleConnsPerHost: parallelism,
+ IdleConnTimeout: time.Minute,
+ ForceAttemptHTTP2: true,
+ },
+ },
+ }
+
+ dialConfig := struct {
+ tlsSkipVerify bool
+ caCert []byte
+ clientCert []byte
+ clientKey []byte
+ }{}
+
+ transport := client.Transport.(*http.Transport)
+
+ if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil {
+ return nil, err
+ }
+ if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil {
+ return nil, err
+ }
+ if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil {
+ return nil, err
+ }
+ if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil {
+ return nil, err
+ }
+
+ transport.TLSClientConfig = &tls.Config{
+ InsecureSkipVerify: dialConfig.tlsSkipVerify,
+ }
+
+ if dialConfig.caCert != nil {
+ caCertPool, err := x509.SystemCertPool()
+ if err != nil {
+ return nil, errors.Wrap(err, "could not load system root CA pool")
+ }
+ if caCertPool == nil {
+ caCertPool = x509.NewCertPool()
+ }
+ if !caCertPool.AppendCertsFromPEM(dialConfig.caCert) {
+ return nil, errors.Errorf("failed to parse certificate data:%s", string(dialConfig.caCert))
+ }
+ transport.TLSClientConfig.RootCAs = caCertPool
+ }
+
+ if dialConfig.clientCert != nil && dialConfig.clientKey == nil {
+ return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientCert, changefeedbase.SinkParamClientKey)
+ } else if dialConfig.clientKey != nil && dialConfig.clientCert == nil {
+ return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientKey, changefeedbase.SinkParamClientCert)
+ }
+
+ if dialConfig.clientCert != nil && dialConfig.clientKey != nil {
+ cert, err := tls.X509KeyPair(dialConfig.clientCert, dialConfig.clientKey)
+ if err != nil {
+ return nil, errors.Wrap(err, `invalid client certificate data provided`)
+ }
+ transport.TLSClientConfig.Certificates = []tls.Certificate{cert}
+ }
+
+ return client, nil
+}
+
+func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) {
+ req, err := http.NewRequestWithContext(sc.ctx, http.MethodPost, sc.url.String(), bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ switch sc.format {
+ case changefeedbase.OptFormatJSON:
+ req.Header.Set("Content-Type", applicationTypeJSON)
+ case changefeedbase.OptFormatCSV:
+ req.Header.Set("Content-Type", applicationTypeCSV)
+ }
+
+ if sc.authHeader != "" {
+ req.Header.Set(authorizationHeader, sc.authHeader)
+ }
+
+ return req, nil
+}
+
+// MakeResolvedPayload implements the SinkClient interface
+func (sc *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) {
+ return sc.makePayloadForBytes(body)
+}
+
+// Flush implements the SinkClient interface
+func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error {
+ req := batch.(*http.Request)
+ res, err := sc.client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer res.Body.Close()
+
+ if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) {
+ resBody, err := io.ReadAll(res.Body)
+ if err != nil {
+ return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode)
+ }
+ return fmt.Errorf("%s: %s", res.Status, string(resBody))
+ }
+ return nil
+}
+
+// Close implements the SinkClient interface
+func (sc *webhookSinkClient) Close() error {
+ sc.client.CloseIdleConnections()
+ return nil
+}
+
+func validateWebhookOpts(
+ u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions,
+) error {
+ if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
+ return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
+ }
+
+ switch encodingOpts.Format {
+ case changefeedbase.OptFormatJSON:
+ case changefeedbase.OptFormatCSV:
+ default:
+ return errors.Errorf(`this sink is incompatible with %s=%s`,
+ changefeedbase.OptFormat, encodingOpts.Format)
+ }
+
+ switch encodingOpts.Envelope {
+ case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare:
+ default:
+ return errors.Errorf(`this sink is incompatible with %s=%s`,
+ changefeedbase.OptEnvelope, encodingOpts.Envelope)
+ }
+
+ encodingOpts.TopicInValue = true
+
+ if encodingOpts.Envelope != changefeedbase.OptEnvelopeBare {
+ encodingOpts.KeyInValue = true
+ }
+
+ return nil
+}
+
+type webhookCSVBuffer struct {
+ bytes []byte
+ messageCount int
+ sc *webhookSinkClient
+}
+
+var _ BatchBuffer = (*webhookCSVBuffer)(nil)
+
+// Append implements the BatchBuffer interface
+func (cb *webhookCSVBuffer) Append(key []byte, value []byte, topic string) {
+ cb.bytes = append(cb.bytes, value...)
+ cb.messageCount += 1
+}
+
+// ShouldFlush implements the BatchBuffer interface
+func (cb *webhookCSVBuffer) ShouldFlush() bool {
+ return cb.sc.shouldFlush(len(cb.bytes), cb.messageCount)
+}
+
+// Close implements the BatchBuffer interface
+func (cb *webhookCSVBuffer) Close() (SinkPayload, error) {
+ return cb.sc.makePayloadForBytes(cb.bytes)
+}
+
+type webhookJSONBuffer struct {
+ messages [][]byte
+ numBytes int
+ sc *webhookSinkClient
+}
+
+var _ BatchBuffer = (*webhookJSONBuffer)(nil)
+
+// Append implements the BatchBuffer interface
+func (jb *webhookJSONBuffer) Append(key []byte, value []byte, topic string) {
+ jb.messages = append(jb.messages, value)
+ jb.numBytes += len(value)
+}
+
+// ShouldFlush implements the BatchBuffer interface
+func (jb *webhookJSONBuffer) ShouldFlush() bool {
+ return jb.sc.shouldFlush(jb.numBytes, len(jb.messages))
+}
+
+// Close implements the BatchBuffer interface
+func (jb *webhookJSONBuffer) Close() (SinkPayload, error) {
+ var buffer bytes.Buffer
+ prefix := "{\"payload\":["
+ suffix := fmt.Sprintf("],\"length\":%d}", len(jb.messages))
+
+ // Grow all at once to avoid reallocations
+ buffer.Grow(len(prefix) + jb.numBytes /* msgs */ + len(jb.messages) /* commas */ + len(suffix))
+
+ buffer.WriteString(prefix)
+ for i, msg := range jb.messages {
+ if i != 0 {
+ buffer.WriteByte(',')
+ }
+ buffer.Write(msg)
+ }
+ buffer.WriteString(suffix)
+ return jb.sc.makePayloadForBytes(buffer.Bytes())
+}
+
+// MakeBatchBuffer implements the SinkClient interface
+func (sc *webhookSinkClient) MakeBatchBuffer() BatchBuffer {
+ if sc.format == changefeedbase.OptFormatCSV {
+ return &webhookCSVBuffer{sc: sc}
+ } else {
+ return &webhookJSONBuffer{
+ sc: sc,
+ messages: make([][]byte, 0, sc.batchCfg.Messages),
+ }
+ }
+}
+
+func (sc *webhookSinkClient) shouldFlush(bytes int, messages int) bool {
+ switch {
+ // all zero values is interpreted as flush every time
+ case sc.batchCfg.Messages == 0 && sc.batchCfg.Bytes == 0 && sc.batchCfg.Frequency == 0:
+ return true
+ // messages threshold has been reached
+ case sc.batchCfg.Messages > 0 && messages >= sc.batchCfg.Messages:
+ return true
+ // bytes threshold has been reached
+ case sc.batchCfg.Bytes > 0 && bytes >= sc.batchCfg.Bytes:
+ return true
+ default:
+ return false
+ }
+}
+
+func makeWebhookSink(
+ ctx context.Context,
+ u sinkURL,
+ encodingOpts changefeedbase.EncodingOptions,
+ opts changefeedbase.WebhookSinkOptions,
+ parallelism int,
+ pacerFactory func() *admission.Pacer,
+ source timeutil.TimeSource,
+ mb metricsRecorderBuilder,
+) (Sink, error) {
+ batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig)
+ if err != nil {
+ return nil, err
+ }
+
+ sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism)
+ if err != nil {
+ return nil, err
+ }
+
+ return makeBatchingSink(
+ ctx,
+ sinkTypeWebhook,
+ sinkClient,
+ time.Duration(batchCfg.Frequency),
+ retryOpts,
+ parallelism,
+ nil,
+ pacerFactory,
+ source,
+ mb(requiresResourceAccounting),
+ ), nil
+}
diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go
index 1788b8789069..906b634a29d9 100644
--- a/pkg/ccl/changefeedccl/telemetry.go
+++ b/pkg/ccl/changefeedccl/telemetry.go
@@ -136,6 +136,8 @@ type telemetryMetricsRecorder struct {
inner metricsRecorder
}
+var _ metricsRecorder = (*telemetryMetricsRecorder)(nil)
+
func (r *telemetryMetricsRecorder) close() {
r.telemetryLogger.close()
}
@@ -184,6 +186,14 @@ func (r *telemetryMetricsRecorder) recordSizeBasedFlush() {
r.inner.recordSizeBasedFlush()
}
+func (r *telemetryMetricsRecorder) recordParallelIOQueueLatency(latency time.Duration) {
+ r.inner.recordParallelIOQueueLatency(latency)
+}
+
+func (r *telemetryMetricsRecorder) recordSinkIOInflightChange(delta int64) {
+ r.inner.recordSinkIOInflightChange(delta)
+}
+
// ContinuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var ContinuousTelemetryInterval = settings.RegisterDurationSetting(
diff --git a/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json b/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json
index 8b6d460720d4..15ab4bfab8ad 100644
--- a/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json
+++ b/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json
@@ -1141,7 +1141,7 @@
"h": 7,
"w": 6,
"x": 0,
- "y": 24
+ "y": 40
},
"id": 18,
"options": {
@@ -1260,7 +1260,7 @@
"h": 7,
"w": 6,
"x": 6,
- "y": 24
+ "y": 40
},
"id": 57,
"options": {
@@ -1353,7 +1353,7 @@
"h": 7,
"w": 5,
"x": 12,
- "y": 24
+ "y": 40
},
"id": 72,
"options": {
@@ -1459,7 +1459,7 @@
"h": 7,
"w": 6,
"x": 0,
- "y": 27
+ "y": 43
},
"id": 35,
"options": {
@@ -1564,7 +1564,7 @@
"h": 7,
"w": 6,
"x": 6,
- "y": 27
+ "y": 43
},
"id": 33,
"options": {
@@ -1657,7 +1657,7 @@
"h": 7,
"w": 6,
"x": 12,
- "y": 27
+ "y": 43
},
"id": 39,
"options": {
@@ -1748,8 +1748,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1765,7 +1764,7 @@
"h": 6,
"w": 4,
"x": 0,
- "y": 20
+ "y": 36
},
"id": 63,
"options": {
@@ -1868,8 +1867,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -1885,7 +1883,7 @@
"h": 6,
"w": 4,
"x": 4,
- "y": 20
+ "y": 36
},
"id": 64,
"options": {
@@ -1988,8 +1986,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -2005,7 +2002,7 @@
"h": 6,
"w": 4,
"x": 8,
- "y": 20
+ "y": 36
},
"id": 65,
"options": {
@@ -2084,8 +2081,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -2101,7 +2097,7 @@
"h": 6,
"w": 4,
"x": 12,
- "y": 20
+ "y": 36
},
"id": 75,
"options": {
@@ -2179,8 +2175,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -2196,7 +2191,7 @@
"h": 6,
"w": 4,
"x": 16,
- "y": 20
+ "y": 36
},
"id": 19,
"options": {
@@ -2233,7 +2228,7 @@
"type": "row"
},
{
- "collapsed": true,
+ "collapsed": false,
"gridPos": {
"h": 1,
"w": 24,
@@ -2241,474 +2236,660 @@
"y": 20
},
"id": 14,
- "panels": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
+ "panels": [],
+ "title": "Sink",
+ "type": "row"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
},
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green",
- "value": null
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- },
- "unit": "ns"
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
},
- "overrides": []
- },
- "gridPos": {
- "h": 6,
- "w": 6,
- "x": 0,
- "y": 21
- },
- "id": 54,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
},
- "tooltip": {
- "mode": "single",
- "sort": "none"
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
}
},
- "pluginVersion": "9.2.3",
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.50, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))",
- "hide": false,
- "legendFormat": "p50 {{label_name}}",
- "range": true,
- "refId": "C"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.75, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))",
- "hide": false,
- "legendFormat": "p75 {{label_name}}",
- "range": true,
- "refId": "D"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
},
- "editorMode": "code",
- "expr": "histogram_quantile(0.95, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))",
- "hide": false,
- "legendFormat": "p95 {{label_name}}",
- "range": true,
- "refId": "E"
- }
- ],
- "title": "Sink Buffer Time Batched",
- "type": "timeseries"
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ns"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 6,
+ "x": 0,
+ "y": 21
+ },
+ "id": 54,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
},
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "pluginVersion": "9.2.3",
+ "targets": [
{
"datasource": {
"type": "prometheus",
"uid": "localprom"
},
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green",
- "value": null
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- },
- "unit": "ns"
- },
- "overrides": []
- },
- "gridPos": {
- "h": 6,
- "w": 5,
- "x": 6,
- "y": 21
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.50, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))",
+ "hide": false,
+ "legendFormat": "p50 {{label_name}}",
+ "range": true,
+ "refId": "C"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
},
- "id": 55,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.75, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))",
+ "hide": false,
+ "legendFormat": "p75 {{label_name}}",
+ "range": true,
+ "refId": "D"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.95, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))",
+ "hide": false,
+ "legendFormat": "p95 {{label_name}}",
+ "range": true,
+ "refId": "E"
+ }
+ ],
+ "title": "Sink Buffer Time Batched",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
},
- "tooltip": {
- "mode": "single",
- "sort": "none"
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
}
},
- "pluginVersion": "9.2.3",
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
},
- "editorMode": "code",
- "expr": "histogram_quantile(0.50, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))",
- "hide": false,
- "legendFormat": "p50 {{label_name}}",
- "range": true,
- "refId": "C"
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ns"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 5,
+ "x": 6,
+ "y": 21
+ },
+ "id": 55,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "pluginVersion": "9.2.3",
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.50, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))",
+ "hide": false,
+ "legendFormat": "p50 {{label_name}}",
+ "range": true,
+ "refId": "C"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.75, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))",
+ "hide": false,
+ "legendFormat": "p75 {{label_name}}",
+ "range": true,
+ "refId": "D"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.95, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))",
+ "hide": false,
+ "legendFormat": "p95 {{label_name}}",
+ "range": true,
+ "refId": "E"
+ }
+ ],
+ "title": "Sink Flush Time",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
},
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.75, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))",
- "hide": false,
- "legendFormat": "p75 {{label_name}}",
- "range": true,
- "refId": "D"
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
},
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.95, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))",
- "hide": false,
- "legendFormat": "p95 {{label_name}}",
- "range": true,
- "refId": "E"
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
}
- ],
- "title": "Sink Flush Time",
- "type": "timeseries"
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "cps"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 5,
+ "x": 11,
+ "y": 21
+ },
+ "id": 60,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": false
},
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "pluginVersion": "9.2.3",
+ "targets": [
{
"datasource": {
"type": "prometheus",
"uid": "localprom"
},
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green",
- "value": null
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- },
- "unit": "cps"
+ "editorMode": "code",
+ "expr": "sum(rate(changefeed_flushes[$__rate_interval]))",
+ "hide": false,
+ "legendFormat": "total_flushes",
+ "range": true,
+ "refId": "E"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "sum(rate(changefeed_size_based_flushes[$__rate_interval]))",
+ "hide": false,
+ "legendFormat": "size_based_flushes",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "Sink Flush Rate",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
},
- "overrides": []
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
},
- "gridPos": {
- "h": 6,
- "w": 5,
- "x": 11,
- "y": 21
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
},
- "id": 60,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
+ "unit": "ns"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 5,
+ "x": 16,
+ "y": 21
+ },
+ "id": 25,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "pluginVersion": "9.2.3",
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.50, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))",
+ "hide": false,
+ "legendFormat": "p50 {{label_name}}",
+ "range": true,
+ "refId": "C"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.75, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))",
+ "hide": false,
+ "legendFormat": "p75 {{label_name}}",
+ "range": true,
+ "refId": "D"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "histogram_quantile(0.95, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))",
+ "hide": false,
+ "legendFormat": "p95 {{label_name}}",
+ "range": true,
+ "refId": "E"
+ }
+ ],
+ "title": "Time Spent Checkpointing",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "description": "",
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
},
- "tooltip": {
- "mode": "single",
- "sort": "none"
+ "thresholdsStyle": {
+ "mode": "off"
}
},
- "pluginVersion": "9.2.3",
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "sum(rate(changefeed_flushes[$__rate_interval]))",
- "hide": false,
- "legendFormat": "total_flushes",
- "range": true,
- "refId": "E"
- },
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
},
- "editorMode": "code",
- "expr": "sum(rate(changefeed_size_based_flushes[$__rate_interval]))",
- "hide": false,
- "legendFormat": "size_based_flushes",
- "range": true,
- "refId": "A"
- }
- ],
- "title": "Sink Flush Rate",
- "type": "timeseries"
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "none"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 7,
+ "w": 6,
+ "x": 0,
+ "y": 27
+ },
+ "id": 77,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
},
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
{
"datasource": {
"type": "prometheus",
"uid": "localprom"
},
- "fieldConfig": {
- "defaults": {
- "color": {
- "mode": "palette-classic"
- },
- "custom": {
- "axisCenteredZero": false,
- "axisColorMode": "text",
- "axisLabel": "",
- "axisPlacement": "auto",
- "barAlignment": 0,
- "drawStyle": "line",
- "fillOpacity": 0,
- "gradientMode": "none",
- "hideFrom": {
- "legend": false,
- "tooltip": false,
- "viz": false
- },
- "lineInterpolation": "linear",
- "lineWidth": 1,
- "pointSize": 5,
- "scaleDistribution": {
- "type": "linear"
- },
- "showPoints": "auto",
- "spanNulls": false,
- "stacking": {
- "group": "A",
- "mode": "none"
- },
- "thresholdsStyle": {
- "mode": "off"
- }
- },
- "mappings": [],
- "thresholds": {
- "mode": "absolute",
- "steps": [
- {
- "color": "green",
- "value": null
- },
- {
- "color": "red",
- "value": 80
- }
- ]
- },
- "unit": "ns"
- },
- "overrides": []
- },
- "gridPos": {
- "h": 6,
- "w": 5,
- "x": 16,
- "y": 21
+ "editorMode": "code",
+ "expr": "sum by(scope) (rate(changefeed_sink_io_inflight{scope!=\"\"}[$__rate_interval]))",
+ "legendFormat": "__auto",
+ "range": true,
+ "refId": "A"
+ }
+ ],
+ "title": "Batches currently in IO",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
},
- "id": 25,
- "options": {
- "legend": {
- "calcs": [],
- "displayMode": "list",
- "placement": "bottom",
- "showLegend": true
+ "custom": {
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
},
- "tooltip": {
- "mode": "single",
- "sort": "none"
- }
- },
- "pluginVersion": "9.2.3",
- "targets": [
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.50, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))",
- "hide": false,
- "legendFormat": "p50 {{label_name}}",
- "range": true,
- "refId": "C"
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
},
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.75, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))",
- "hide": false,
- "legendFormat": "p75 {{label_name}}",
- "range": true,
- "refId": "D"
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
},
- {
- "datasource": {
- "type": "prometheus",
- "uid": "localprom"
- },
- "editorMode": "code",
- "expr": "histogram_quantile(0.95, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))",
- "hide": false,
- "legendFormat": "p95 {{label_name}}",
- "range": true,
- "refId": "E"
+ "thresholdsStyle": {
+ "mode": "off"
}
- ],
- "title": "Time Spent Checkpointing",
- "type": "timeseries"
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ },
+ "unit": "ns"
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 7,
+ "w": 5,
+ "x": 6,
+ "y": 27
+ },
+ "id": 80,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "localprom"
+ },
+ "editorMode": "code",
+ "expr": "sum by (scope) ((rate(changefeed_parallel_io_queue_nanos_sum{scope!=\"\"}[5m]) / rate(changefeed_parallel_io_queue_nanos_count{scope!=\"\"}[5m])))",
+ "legendFormat": "{{scope}}",
+ "range": true,
+ "refId": "A"
}
],
- "title": "Sink",
- "type": "row"
+ "title": "Parallel IO Average Queue Time",
+ "type": "timeseries"
},
{
"collapsed": true,
@@ -2716,7 +2897,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 21
+ "y": 34
},
"id": 16,
"panels": [
@@ -2766,8 +2947,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -2783,7 +2963,7 @@
"h": 8,
"w": 8,
"x": 0,
- "y": 22
+ "y": 38
},
"id": 22,
"options": {
@@ -2860,8 +3040,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -2877,7 +3056,7 @@
"h": 8,
"w": 10,
"x": 8,
- "y": 22
+ "y": 38
},
"id": 24,
"options": {
@@ -2954,8 +3133,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -2971,7 +3149,7 @@
"h": 8,
"w": 8,
"x": 0,
- "y": 30
+ "y": 46
},
"id": 23,
"options": {
@@ -3048,8 +3226,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -3065,7 +3242,7 @@
"h": 8,
"w": 5,
"x": 8,
- "y": 30
+ "y": 46
},
"id": 50,
"options": {
@@ -3141,8 +3318,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -3158,7 +3334,7 @@
"h": 8,
"w": 5,
"x": 13,
- "y": 30
+ "y": 46
},
"id": 51,
"options": {
@@ -3200,7 +3376,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 22
+ "y": 35
},
"id": 59,
"panels": [],
@@ -3213,7 +3389,7 @@
"h": 1,
"w": 24,
"x": 0,
- "y": 23
+ "y": 36
},
"id": 27,
"panels": [
@@ -3262,8 +3438,7 @@
"mode": "absolute",
"steps": [
{
- "color": "green",
- "value": null
+ "color": "green"
},
{
"color": "red",
@@ -3278,7 +3453,7 @@
"h": 7,
"w": 8,
"x": 0,
- "y": 24
+ "y": 40
},
"id": 31,
"options": {
diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go
index dd1dc9da998c..83c25cf37c0c 100644
--- a/pkg/cmd/roachtest/tests/cdc.go
+++ b/pkg/cmd/roachtest/tests/cdc.go
@@ -1235,6 +1235,7 @@ func registerCDC(r registry.Registry) {
ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"})
+ // The deprecated webhook sink is unable to handle the throughput required for 100 warehouses
if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil {
ct.t.Fatal(err)
}
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index 88b6f0a866d2..d8f6315e0102 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -1564,6 +1564,7 @@ var charts = []sectionDescription{
"changefeed.checkpoint_hist_nanos",
"changefeed.flush_hist_nanos",
"changefeed.sink_batch_hist_nanos",
+ "changefeed.parallel_io_queue_nanos",
},
},
{
@@ -1588,6 +1589,12 @@ var charts = []sectionDescription{
"changefeed.flush.messages_pushback_nanos",
},
},
+ {
+ Title: "Sink IO",
+ Metrics: []string{
+ "changefeed.sink_io_inflight",
+ },
+ },
{
Title: "Batching",
Metrics: []string{
diff --git a/pkg/ts/catalog/metrics.go b/pkg/ts/catalog/metrics.go
index ce2f8f8bbc02..29fdb2ea422f 100644
--- a/pkg/ts/catalog/metrics.go
+++ b/pkg/ts/catalog/metrics.go
@@ -97,6 +97,7 @@ var histogramMetricsNames = map[string]struct{}{
"changefeed.message_size_hist": {},
"changefeed.commit_latency": {},
"changefeed.sink_batch_hist_nanos": {},
+ "changefeed.parallel_io_queue_nanos": {},
"replication.admit_latency": {},
"replication.commit_latency": {},
"replication.flush_hist_nanos": {},