diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index a40331d84d4b..6501f5f7c87e 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -17,6 +17,7 @@ changefeed.event_consumer_workers integer 0 the number of workers to use when pr changefeed.fast_gzip.enabled boolean true use fast gzip implementation changefeed.node_throttle_config string specifies node level throttling configuration for all changefeeeds changefeed.schema_feed.read_with_priority_after duration 1m0s retry with high priority if we were not able to read descriptors for too long; 0 disables +changefeed.sink_io_workers integer 0 the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. cloudstorage.azure.concurrent_upload_buffers integer 1 controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload cloudstorage.http.custom_ca string custom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage cloudstorage.timeout duration 10m0s the timeout for import/export storage operations diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 01de5764264d..f65e5d4ce5de 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -23,6 +23,7 @@
changefeed.fast_gzip.enabled
booleantrueuse fast gzip implementation
changefeed.node_throttle_config
stringspecifies node level throttling configuration for all changefeeeds
changefeed.schema_feed.read_with_priority_after
duration1m0sretry with high priority if we were not able to read descriptors for too long; 0 disables +
changefeed.sink_io_workers
integer0the number of workers used by changefeeds when sending requests to the sink (currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.
cloudstorage.azure.concurrent_upload_buffers
integer1controls the number of concurrent buffers that will be used by the Azure client when uploading chunks.Each buffer can buffer up to cloudstorage.write_chunk.size of memory during an upload
cloudstorage.http.custom_ca
stringcustom root CA (appended to system's default CAs) for verifying certificates when interacting with HTTPS storage
cloudstorage.timeout
duration10m0sthe timeout for import/export storage operations diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 5fa136eff7a3..480f7c7cdd91 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "alter_changefeed_stmt.go", "authorization.go", "avro.go", + "batching_sink.go", "changefeed.go", "changefeed_dist.go", "changefeed_processors.go", @@ -20,6 +21,7 @@ go_library( "event_processing.go", "metrics.go", "name.go", + "parallel_io.go", "parquet_sink_cloudstorage.go", "retry.go", "scheduled_changefeed.go", @@ -32,6 +34,7 @@ go_library( "sink_pubsub.go", "sink_sql.go", "sink_webhook.go", + "sink_webhook_v2.go", "telemetry.go", "testing_knobs.go", "tls.go", @@ -126,6 +129,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/httputil", "//pkg/util/humanizeutil", + "//pkg/util/intsets", "//pkg/util/json", "//pkg/util/log", "//pkg/util/log/eventpb", diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go new file mode 100644 index 000000000000..d764d7abcab6 --- /dev/null +++ b/pkg/ccl/changefeedccl/batching_sink.go @@ -0,0 +1,482 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "fmt" + "hash" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// SinkClient is an interface to an external sink, where messages are written +// into batches as they arrive and once ready are flushed out. +type SinkClient interface { + MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) + MakeBatchBuffer() BatchBuffer + Flush(context.Context, SinkPayload) error + Close() error +} + +// BatchBuffer is an interface to aggregate KVs into a payload that can be sent +// to the sink. +type BatchBuffer interface { + Append(key []byte, value []byte, topic string) + ShouldFlush() bool + + // Once all data has been Append'ed, Close can be called to return a finalized + // Payload that is valid for a pure IO operation via Flush. All final encoding + // work (ex: wrapping an array in an object with metadata) should be done in + // Close, as non-io-related work done in Flush would be unnecessarily repeated + // upon retries. + Close() (SinkPayload, error) +} + +// SinkPayload is an interface representing a sink-specific representation of a +// batch of messages that is ready to be emitted by its Flush method. +type SinkPayload interface{} + +// batchingSink wraps a SinkClient to provide a Sink implementation that calls +// the SinkClient methods to form batches and flushes those batches across +// multiple parallel IO workers. +type batchingSink struct { + client SinkClient + topicNamer *TopicNamer + concreteType sinkType + + ioWorkers int + minFlushFrequency time.Duration + retryOpts retry.Options + + ts timeutil.TimeSource + metrics metricsRecorder + knobs batchingSinkKnobs + + // eventCh is the channel used to send requests from the Sink caller routines + // to the batching routine. Messages can either be a flushReq or a rowEvent. + eventCh chan interface{} + + pacer *admission.Pacer + pacerFactory func() *admission.Pacer + + termErr error + wg ctxgroup.Group + hasher hash.Hash32 + doneCh chan struct{} +} + +type batchingSinkKnobs struct { + OnAppend func(*rowEvent) +} + +type flushReq struct { + waiter chan struct{} +} + +type rowEvent struct { + key []byte + val []byte + topic string + + alloc kvevent.Alloc + mvcc hlc.Timestamp +} + +// Flush implements the Sink interface, returning the first error that has +// occured in the past EmitRow calls. +func (s *batchingSink) Flush(ctx context.Context) error { + defer s.metrics.recordFlushRequestCallback()() + flushWaiter := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + case s.eventCh <- flushReq{waiter: flushWaiter}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.doneCh: + return nil + case <-flushWaiter: + if s.termErr != nil { + return s.termErr + } + } + + // Refresh the pacer in case any settings have changed. s.pacer can safely be + // assigned since once the Flush has completed waiting, no new messages exist + // to be processed so pacer.Pace won't be called by the batching worker. + s.pacer = s.pacerFactory() + + return nil +} + +var _ Sink = (*batchingSink)(nil) + +// Event structs and batch structs which are transferred across routines (and +// therefore escape to the heap) can both be incredibly frequent (every event +// may be its own batch) and temporary, so to avoid GC thrashing they are both +// claimed and freed from object pools. +var eventPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(rowEvent) + }, +} + +func newRowEvent() *rowEvent { + return eventPool.Get().(*rowEvent) +} +func freeRowEvent(e *rowEvent) { + *e = rowEvent{} + eventPool.Put(e) +} + +var batchPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(sinkBatch) + }, +} + +func newSinkBatch() *sinkBatch { + return batchPool.Get().(*sinkBatch) +} +func freeSinkBatchEvent(b *sinkBatch) { + *b = sinkBatch{} + batchPool.Put(b) +} + +// EmitRow implements the Sink interface. +func (s *batchingSink) EmitRow( + ctx context.Context, + topic TopicDescriptor, + key, value []byte, + updated, mvcc hlc.Timestamp, + alloc kvevent.Alloc, +) error { + s.metrics.recordMessageSize(int64(len(key) + len(value))) + + payload := newRowEvent() + payload.key = key + payload.val = value + payload.topic = "" // unimplemented for now + payload.mvcc = mvcc + payload.alloc = alloc + + select { + case <-ctx.Done(): + return ctx.Err() + case s.eventCh <- payload: + case <-s.doneCh: + } + + return nil +} + +// EmitResolvedTimestamp implements the Sink interface. +func (s *batchingSink) EmitResolvedTimestamp( + ctx context.Context, encoder Encoder, resolved hlc.Timestamp, +) error { + data, err := encoder.EncodeResolvedTimestamp(ctx, "", resolved) + if err != nil { + return err + } + payload, err := s.client.MakeResolvedPayload(data, "") + if err != nil { + return err + } + + if err = s.Flush(ctx); err != nil { + return err + } + return retry.WithMaxAttempts(ctx, s.retryOpts, s.retryOpts.MaxRetries+1, func() error { + return s.client.Flush(ctx, payload) + }) +} + +// Close implements the Sink interface. +func (s *batchingSink) Close() error { + close(s.doneCh) + _ = s.wg.Wait() + s.pacer.Close() + return s.client.Close() +} + +// Dial implements the Sink interface. +func (s *batchingSink) Dial() error { + return nil +} + +// getConcreteType implements the Sink interface. +func (s *batchingSink) getConcreteType() sinkType { + return s.concreteType +} + +// sinkBatch stores an in-progress/complete batch of messages, along with +// metadata related to the batch. +type sinkBatch struct { + buffer BatchBuffer + payload SinkPayload // payload is nil until FinalizePayload has been called + + numMessages int + numKVBytes int // the total amount of uncompressed kv data in the batch + keys intsets.Fast // the set of keys within the batch to provide to parallelIO + bufferTime time.Time // the earliest time a message was inserted into the batch + mvcc hlc.Timestamp + + alloc kvevent.Alloc + hasher hash.Hash32 +} + +// FinalizePayload closes the writer to produce a payload that is ready to be +// Flushed by the SinkClient. +func (sb *sinkBatch) FinalizePayload() error { + payload, err := sb.buffer.Close() + if err != nil { + return err + } + sb.payload = payload + return nil +} + +// Keys implements the IORequest interface. +func (sb *sinkBatch) Keys() intsets.Fast { + return sb.keys +} + +func (sb *sinkBatch) isEmpty() bool { + return sb.numMessages == 0 +} + +func hashToInt(h hash.Hash32, buf []byte) int { + h.Reset() + h.Write(buf) + return int(h.Sum32()) +} + +// Append adds the contents of a kvEvent to the batch, merging its alloc pool. +func (sb *sinkBatch) Append(e *rowEvent) { + if sb.isEmpty() { + sb.bufferTime = timeutil.Now() + } + + sb.buffer.Append(e.key, e.val, e.topic) + + sb.keys.Add(hashToInt(sb.hasher, e.key)) + sb.numMessages += 1 + sb.numKVBytes += len(e.key) + len(e.val) + + if sb.mvcc.IsEmpty() || e.mvcc.Less(sb.mvcc) { + sb.mvcc = e.mvcc + } + + sb.alloc.Merge(&e.alloc) +} + +func (s *batchingSink) handleError(err error) { + if s.termErr == nil { + s.termErr = err + } +} + +func (s *batchingSink) newBatchBuffer() *sinkBatch { + batch := newSinkBatch() + batch.buffer = s.client.MakeBatchBuffer() + batch.hasher = s.hasher + return batch +} + +// runBatchingWorker combines 1 or more row events into batches, sending the IO +// requests out either once the batch is full or a flush request arrives. +func (s *batchingSink) runBatchingWorker(ctx context.Context) { + batchBuffer := s.newBatchBuffer() + + // Once finalized, batches are sent to a parallelIO struct which handles + // performing multiple Flushes in parallel while maintaining Keys() ordering. + ioHandler := func(ctx context.Context, req IORequest) error { + batch, _ := req.(*sinkBatch) + defer s.metrics.recordSinkIOInflightChange(int64(-batch.numMessages)) + s.metrics.recordSinkIOInflightChange(int64(batch.numMessages)) + return s.client.Flush(ctx, batch.payload) + } + ioEmitter := newParallelIO(ctx, s.retryOpts, s.ioWorkers, ioHandler, s.metrics) + defer ioEmitter.Close() + + // Flushing requires tracking the number of inflight messages and confirming + // completion to the requester once the counter reaches 0. + inflight := 0 + var sinkFlushWaiter chan struct{} + + handleResult := func(result *ioResult) { + batch, _ := result.request.(*sinkBatch) + + if result.err != nil { + s.handleError(result.err) + } else { + s.metrics.recordEmittedBatch( + batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, + ) + } + + inflight -= batch.numMessages + + if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil { + close(sinkFlushWaiter) + sinkFlushWaiter = nil + } + + freeIOResult(result) + batch.alloc.Release(ctx) + freeSinkBatchEvent(batch) + } + + tryFlushBatch := func() error { + if batchBuffer.isEmpty() { + return nil + } + toFlush := batchBuffer + batchBuffer = s.newBatchBuffer() + + if err := toFlush.FinalizePayload(); err != nil { + return err + } + + // Emitting needs to also handle any incoming results to avoid a deadlock + // with trying to emit while the emitter is blocked on returning a result. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ioEmitter.requestCh <- toFlush: + case result := <-ioEmitter.resultCh: + handleResult(result) + continue + case <-s.doneCh: + } + break + } + + return nil + } + + flushTimer := s.ts.NewTimer() + defer flushTimer.Stop() + + for { + select { + case req := <-s.eventCh: + if err := s.pacer.Pace(ctx); err != nil { + if pacerLogEvery.ShouldLog() { + log.Errorf(ctx, "automatic sink batcher pacing: %v", err) + } + } + + switch r := req.(type) { + case *rowEvent: + if s.termErr != nil { + continue + } + + inflight += 1 + + // If we're about to append to an empty batch, start the timer to + // guarantee the messages do not stay buffered longer than the + // configured frequency. + if batchBuffer.isEmpty() && s.minFlushFrequency > 0 { + flushTimer.Reset(s.minFlushFrequency) + } + + batchBuffer.Append(r) + if s.knobs.OnAppend != nil { + s.knobs.OnAppend(r) + } + + // The event struct can be freed as the contents are expected to be + // managed by the batch instead. + freeRowEvent(r) + + if batchBuffer.buffer.ShouldFlush() { + s.metrics.recordSizeBasedFlush() + if err := tryFlushBatch(); err != nil { + s.handleError(err) + } + } + case flushReq: + if inflight == 0 || s.termErr != nil { + close(r.waiter) + } else { + sinkFlushWaiter = r.waiter + if err := tryFlushBatch(); err != nil { + s.handleError(err) + } + } + default: + s.handleError(fmt.Errorf("received unknown request of unknown type: %v", r)) + } + case result := <-ioEmitter.resultCh: + handleResult(result) + case <-flushTimer.Ch(): + flushTimer.MarkRead() + if err := tryFlushBatch(); err != nil { + s.handleError(err) + } + case <-ctx.Done(): + return + case <-s.doneCh: + return + } + } +} + +func makeBatchingSink( + ctx context.Context, + concreteType sinkType, + client SinkClient, + minFlushFrequency time.Duration, + retryOpts retry.Options, + numWorkers int, + topicNamer *TopicNamer, + pacerFactory func() *admission.Pacer, + timeSource timeutil.TimeSource, + metrics metricsRecorder, +) Sink { + sink := &batchingSink{ + client: client, + topicNamer: topicNamer, + concreteType: concreteType, + minFlushFrequency: minFlushFrequency, + ioWorkers: numWorkers, + retryOpts: retryOpts, + ts: timeSource, + metrics: metrics, + eventCh: make(chan interface{}, flushQueueDepth), + wg: ctxgroup.WithContext(ctx), + hasher: makeHasher(), + pacerFactory: pacerFactory, + pacer: pacerFactory(), + doneCh: make(chan struct{}), + } + + sink.wg.GoCtx(func(ctx context.Context) error { + sink.runBatchingWorker(ctx) + return nil + }) + return sink +} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index fde56353bfa3..76032968ded5 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -564,12 +564,19 @@ func (ca *changeAggregator) tick() error { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: - return ca.sink.Flush(ca.Ctx()) + return ca.flushBufferedEvents() } return nil } +func (ca *changeAggregator) flushBufferedEvents() error { + if err := ca.eventConsumer.Flush(ca.Ctx()); err != nil { + return err + } + return ca.sink.Flush(ca.Ctx()) +} + // noteResolvedSpan periodically flushes Frontier progress from the current // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress @@ -621,7 +628,7 @@ func (ca *changeAggregator) flushFrontier() error { // otherwise, we could lose buffered messages and violate the // at-least-once guarantee. This is also true for checkpointing the // resolved spans in the job progress. - if err := ca.sink.Flush(ca.Ctx()); err != nil { + if err := ca.flushBufferedEvents(); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index a75f6635f598..ea1414a48dac 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4877,17 +4877,17 @@ func TestChangefeedErrors(t *testing.T) { `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `invalid option value webhook_sink_config, all config values must be non-negative`, + t, `invalid sink config, all values must be non-negative`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": -100, "Frequency": "1s"}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `invalid option value webhook_sink_config, all config values must be non-negative`, + t, `invalid sink config, all values must be non-negative`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": 100, "Frequency": "-1s"}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `invalid option value webhook_sink_config, flush frequency is not set, messages may never be sent`, + t, `invalid sink config, Flush.Frequency is not set, messages may never be sent`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Flush": {"Messages": 100}}'`, `webhook-https://fake-host`, ) @@ -4907,17 +4907,17 @@ func TestChangefeedErrors(t *testing.T) { `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `max retries must be either a positive int or 'inf' for infinite retries.`, + t, `Retry.Max must be either a positive int or 'inf' for infinite retries.`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": "not valid"}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `max retry count must be a positive integer. use 'inf' for infinite retries.`, + t, `Retry.Max must be a positive integer. use 'inf' for infinite retries.`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": 0}}'`, `webhook-https://fake-host`, ) sqlDB.ExpectErr( - t, `max retry count must be a positive integer. use 'inf' for infinite retries.`, + t, `Retry.Max must be a positive integer. use 'inf' for infinite retries.`, `CREATE CHANGEFEED FOR foo INTO $1 WITH webhook_sink_config='{"Retry": {"Max": -1}}'`, `webhook-https://fake-host`, ) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index abb8292b3867..9bd743c06ee2 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -262,9 +262,9 @@ var EventConsumerPacerRequestSize = settings.RegisterDurationSetting( settings.PositiveDuration, ) -// EventConsumerElasticCPUControlEnabled determines whether changefeed event +// PerEventElasticCPUControlEnabled determines whether changefeed event // processing integrates with elastic CPU control. -var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( +var PerEventElasticCPUControlEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "changefeed.cpu.per_event_elastic_control.enabled", "determines whether changefeed event processing integrates with elastic CPU control", @@ -281,3 +281,27 @@ var RequireExternalConnectionSink = settings.RegisterBoolSetting( " see https://www.cockroachlabs.com/docs/stable/create-external-connection.html", false, ) + +// SinkIOWorkers controls the number of IO workers used by sinks that use +// parallelIO to be able to send multiple requests in parallel. +var SinkIOWorkers = settings.RegisterIntSetting( + settings.TenantWritable, + "changefeed.sink_io_workers", + "the number of workers used by changefeeds when sending requests to the sink "+ + "(currently webhook only): <0 disables, 0 assigns a reasonable default, >0 assigns the setting value.", + 0, +).WithPublic() + +// SinkPacerRequestSize specifies how often (measured in CPU time) +// that the Sink batching worker request CPU time from admission control. For +// example, every N milliseconds of CPU work, request N more milliseconds of CPU +// time. +var SinkPacerRequestSize = settings.RegisterDurationSetting( + settings.TenantWritable, + "changefeed.cpu.sink_encoding_allocation", + "an event consumer worker will perform a blocking request for CPU time "+ + "before consuming events. after fully utilizing this CPU time, it will "+ + "request more", + 50*time.Millisecond, + settings.PositiveDuration, +) diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 2861515fe03e..7c44cf17d815 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -104,7 +104,7 @@ func newEventConsumer( } pacerRequestUnit := changefeedbase.EventConsumerPacerRequestSize.Get(&cfg.Settings.SV) - enablePacer := changefeedbase.EventConsumerElasticCPUControlEnabled.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV) makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) { var err error @@ -182,7 +182,11 @@ func newEventConsumer( workerChSize: changefeedbase.EventConsumerWorkerQueueSize.Get(&cfg.Settings.SV), spanFrontier: spanFrontier, } - ss := &safeSink{wrapped: sink, beforeFlush: c.Flush} + ss := sink + // Only webhook supports concurrent EmitRow calls + if sink.getConcreteType() != sinkTypeWebhook { + ss = &safeSink{wrapped: sink} + } c.makeConsumer = func() (eventConsumer, error) { return makeConsumer(ss, c) } diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 1752392f5136..2e1f5a4f0483 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -854,6 +854,12 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { for _, weight := range sinkWeights { weightTotal += weight } + if weightTotal == 0 { + // This exists for testing purposes, where one may want to run all tests on + // the same sink and set sinkWeights to be 1 only for that sink, but some + // tests explicitly disallow that sink and therefore have no valid sinks. + return "skip" + } p := rand.Float32() * float32(weightTotal) var sum float32 = 0 for sink, weight := range sinkWeights { @@ -862,7 +868,7 @@ func randomSinkTypeWithOptions(options feedTestOptions) string { return sink } } - return "kafka" // unreachable + return "skip" // unreachable } // addCloudStorageOptions adds the options necessary to enable a server to run a @@ -982,6 +988,9 @@ func cdcTestNamedWithSystem( cleanupCloudStorage := addCloudStorageOptions(t, &options) sinkType := randomSinkTypeWithOptions(options) + if sinkType == "skip" { + return + } testLabel := sinkType if name != "" { testLabel = fmt.Sprintf("%s/%s", sinkType, name) diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index cfe92a54a13a..4d9539ebe213 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -32,6 +32,7 @@ const ( changefeedCheckpointHistMaxLatency = 30 * time.Second changefeedBatchHistMaxLatency = 30 * time.Second changefeedFlushHistMaxLatency = 1 * time.Minute + changefeedIOQueueMaxLatency = 5 * time.Minute admitLatencyMaxValue = 1 * time.Minute commitLatencyMaxValue = 10 * time.Minute ) @@ -55,6 +56,8 @@ type AggMetrics struct { Flushes *aggmetric.AggCounter FlushHistNanos *aggmetric.AggHistogram SizeBasedFlushes *aggmetric.AggCounter + ParallelIOQueueNanos *aggmetric.AggHistogram + SinkIOInflight *aggmetric.AggGauge CommitLatency *aggmetric.AggHistogram BackfillCount *aggmetric.AggGauge BackfillPendingRanges *aggmetric.AggGauge @@ -92,6 +95,8 @@ type metricsRecorder interface { getBackfillCallback() func() func() getBackfillRangeCallback() func(int64) (func(), func()) recordSizeBasedFlush() + recordParallelIOQueueLatency(time.Duration) + recordSinkIOInflightChange(int64) } var _ metricsRecorder = (*sliMetrics)(nil) @@ -111,6 +116,8 @@ type sliMetrics struct { Flushes *aggmetric.Counter FlushHistNanos *aggmetric.Histogram SizeBasedFlushes *aggmetric.Counter + ParallelIOQueueNanos *aggmetric.Histogram + SinkIOInflight *aggmetric.Gauge CommitLatency *aggmetric.Histogram ErrorRetries *aggmetric.Counter AdmitLatency *aggmetric.Histogram @@ -244,6 +251,20 @@ func (m *sliMetrics) recordSizeBasedFlush() { m.SizeBasedFlushes.Inc(1) } +func (m *sliMetrics) recordParallelIOQueueLatency(latency time.Duration) { + if m == nil { + return + } + m.ParallelIOQueueNanos.RecordValue(latency.Nanoseconds()) +} +func (m *sliMetrics) recordSinkIOInflightChange(delta int64) { + if m == nil { + return + } + + m.SinkIOInflight.Inc(delta) +} + type wrappingCostController struct { ctx context.Context inner metricsRecorder @@ -314,6 +335,14 @@ func (w *wrappingCostController) recordSizeBasedFlush() { w.inner.recordSizeBasedFlush() } +func (w *wrappingCostController) recordParallelIOQueueLatency(latency time.Duration) { + w.inner.recordParallelIOQueueLatency(latency) +} + +func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) { + w.inner.recordSinkIOInflightChange(delta) +} + var ( metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", @@ -510,6 +539,18 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Registrations", Unit: metric.Unit_COUNT, } + metaChangefeedParallelIOQueueNanos := metric.Metadata{ + Name: "changefeed.parallel_io_queue_nanos", + Help: "Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedSinkIOInflight := metric.Metadata{ + Name: "changefeed.sink_io_inflight", + Help: "The number of keys currently inflight as IO requests being sent to the sink", + Measurement: "Messages", + Unit: metric.Unit_COUNT, + } // NB: When adding new histograms, use sigFigs = 1. Older histograms // retain significant figures of 2. b := aggmetric.MakeBuilder("scope") @@ -528,6 +569,14 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { FlushedBytes: b.Counter(metaChangefeedFlushedBytes), Flushes: b.Counter(metaChangefeedFlushes), SizeBasedFlushes: b.Counter(metaSizeBasedFlushes), + ParallelIOQueueNanos: b.Histogram(metric.HistogramOptions{ + Metadata: metaChangefeedParallelIOQueueNanos, + Duration: histogramWindow, + MaxVal: changefeedIOQueueMaxLatency.Nanoseconds(), + SigFigs: 2, + Buckets: metric.BatchProcessLatencyBuckets, + }), + SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight), BatchHistNanos: b.Histogram(metric.HistogramOptions{ Metadata: metaChangefeedBatchHistNanos, @@ -611,6 +660,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { Flushes: a.Flushes.AddChild(scope), FlushHistNanos: a.FlushHistNanos.AddChild(scope), SizeBasedFlushes: a.SizeBasedFlushes.AddChild(scope), + ParallelIOQueueNanos: a.ParallelIOQueueNanos.AddChild(scope), + SinkIOInflight: a.SinkIOInflight.AddChild(scope), CommitLatency: a.CommitLatency.AddChild(scope), ErrorRetries: a.ErrorRetries.AddChild(scope), AdmitLatency: a.AdmitLatency.AddChild(scope), diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go new file mode 100644 index 000000000000..b986da71af21 --- /dev/null +++ b/pkg/ccl/changefeedccl/parallel_io.go @@ -0,0 +1,282 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "context" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// parallelIO allows performing blocking "IOHandler" calls on in parallel. +// IORequests implement a Keys() function returning keys on which ordering is +// preserved. +// Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that +// order, [a,b] and [e,f] can be emitted concurrentyl while [b,c] will block +// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d] +// errored, [b,c] would never be sent, and an error would be returned with [c,d] +// in an ioResult struct sent to resultCh. After sending an error to resultCh +// all workers are torn down and no further requests are received or handled. +type parallelIO struct { + retryOpts retry.Options + wg ctxgroup.Group + metrics metricsRecorder + doneCh chan struct{} + + ioHandler IOHandler + + requestCh chan IORequest + resultCh chan *ioResult // readers should freeIOResult after handling result events +} + +// IORequest represents an abstract unit of IO that has a set of keys upon which +// sequential ordering of fulfillment must be enforced. +type IORequest interface { + Keys() intsets.Fast +} + +// ioResult stores the full request that was sent as well as an error if even +// after retries the IOHanlder was unable to succeed. +type ioResult struct { + request IORequest + err error +} + +var resultPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(ioResult) + }, +} + +func newIOResult(req IORequest, err error) *ioResult { + res := resultPool.Get().(*ioResult) + res.request = req + res.err = err + return res +} +func freeIOResult(e *ioResult) { + *e = ioResult{} + resultPool.Put(e) +} + +type queuedRequest struct { + req IORequest + admitTime time.Time +} + +// IOHandler performs a blocking IO operation on an IORequest +type IOHandler func(context.Context, IORequest) error + +func newParallelIO( + ctx context.Context, + retryOpts retry.Options, + numWorkers int, + handler IOHandler, + metrics metricsRecorder, +) *parallelIO { + wg := ctxgroup.WithContext(ctx) + io := ¶llelIO{ + retryOpts: retryOpts, + wg: wg, + metrics: metrics, + ioHandler: handler, + requestCh: make(chan IORequest, numWorkers), + resultCh: make(chan *ioResult, numWorkers), + doneCh: make(chan struct{}), + } + + wg.GoCtx(func(ctx context.Context) error { + return io.processIO(ctx, numWorkers) + }) + + return io +} + +// Close stops all workers immediately and returns once they shut down. Inflight +// requests sent to requestCh may never result in being sent to resultCh. +func (p *parallelIO) Close() { + close(p.doneCh) + _ = p.wg.Wait() +} + +// processIO starts numEmitWorkers worker threads to run the IOHandler on +// non-conflicting IORequests each retrying according to the retryOpts, then: +// - Reads incoming messages from requestCh, sending them to any worker if there +// aren't any conflicting messages and queing them up to be sent later +// otherwise. +// - Reads results from the workers and forwards the information to resultCh, at +// this point also sending the first pending request that would now be sendable. +// +// ┌───────────┐ +// ┌►│io worker 1├──┐ +// │ └───────────┘ │ +// <-requestCh──────────┤► ... ├─┬─► resultCh<- +// │ ▲ │ ┌───────────┐ │ │ +// │conflict │ └►│io worker n├──┘ │ +// │ │ └───────────┘ │ +// ▼ │ │check pending +// ┌─────────┐ │no conflict │ +// │ pending ├───────┘ │ +// └─────────┘ │ +// ▲ │ +// └──────────────────────────────────┘ +// +// The conflict checking is done via an intset.Fast storing the union of all +// keys currently being sent, followed by checking each pending batch's intset. +func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error { + emitWithRetries := func(ctx context.Context, payload IORequest) error { + initialSend := true + return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error { + if !initialSend { + p.metrics.recordInternalRetry(int64(payload.Keys().Len()), false) + } + initialSend = false + return p.ioHandler(ctx, payload) + }) + } + + // Multiple worker routines handle the IO operations, retrying when necessary. + workerEmitCh := make(chan IORequest, numEmitWorkers) + defer close(workerEmitCh) + workerResultCh := make(chan *ioResult, numEmitWorkers) + + for i := 0; i < numEmitWorkers; i++ { + p.wg.GoCtx(func(ctx context.Context) error { + for req := range workerEmitCh { + result := newIOResult(req, emitWithRetries(ctx, req)) + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case workerResultCh <- result: + } + } + return nil + }) + } + + // submitIO, which sends requests to the workers, has to select on both + // workerEmitCh<- and <-workerResultCh, since without the <-workerResultCh + // there could be a deadlock where all workers are blocked on emitting results + // and thereby unable to accept new work. If a result is received, it also + // cannot immediately call handleResult on it as that could cause a re-entrant + // submitIO -> handleResult -> submitIO -> handleResult chain which is complex + // to manage. To avoid this, results are added to a pending list to be handled + // separately. + var pendingResults []*ioResult + submitIO := func(req IORequest) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case workerEmitCh <- req: + return nil + case res := <-workerResultCh: + pendingResults = append(pendingResults, res) + } + } + } + + // The main routine keeps track of incoming and completed requests, where + // admitted requests yet to be completed have their Keys() tracked in an + // intset, and any incoming request with keys already in the intset are placed + // in a Queue to be sent to IO workers once the conflicting requests complete. + var inflight intsets.Fast + var pending []queuedRequest + + handleResult := func(res *ioResult) error { + if res.err == nil { + // Clear out the completed keys to check for newly valid pending requests. + inflight.DifferenceWith(res.request.Keys()) + + // Check for a pending request that is now able to be sent i.e. is not + // conflicting with any inflight requests or any requests that arrived + // earlier than itself in the pending queue. + pendingKeys := intsets.Fast{} + for i, pendingReq := range pending { + if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) { + inflight.UnionWith(pendingReq.req.Keys()) + pending = append(pending[:i], pending[i+1:]...) + p.metrics.recordParallelIOQueueLatency(timeutil.Since(pendingReq.admitTime)) + if err := submitIO(pendingReq.req); err != nil { + return err + } + break + } + + pendingKeys.UnionWith(pendingReq.req.Keys()) + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + case p.resultCh <- res: + return nil + } + } + + // If any yet-to-be-handled request observed so far shares any keys with an + // incoming request, the incoming request cannot be sent out and risk arriving + // earlier. + hasConflictingKeys := func(keys intsets.Fast) bool { + if inflight.Intersects(keys) { + return true + } + for _, pendingReq := range pending { + if pendingReq.req.Keys().Intersects(keys) { + return true + } + } + return false + } + + for { + // Handle any results that arrived during any submitIO attempts + unhandled := pendingResults + pendingResults = nil + for _, res := range unhandled { + if err := handleResult(res); err != nil { + return err + } + } + + select { + case req := <-p.requestCh: + if hasConflictingKeys(req.Keys()) { + // If a request conflicts with any currently unhandled requests, add it + // to the pending queue to be rechecked for validity later. + pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()}) + } else { + inflight.UnionWith(req.Keys()) + if err := submitIO(req); err != nil { + return err + } + } + case res := <-workerResultCh: + if err := handleResult(res); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + case <-p.doneCh: + return nil + } + } +} diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 70eff852d348..12d41d2b0eb3 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -10,7 +10,11 @@ package changefeedccl import ( "context" + "encoding/json" + "math" "net/url" + "runtime" + "strconv" "strings" "time" @@ -19,13 +23,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -146,6 +156,16 @@ func getAndDialSink( return sink, sink.Dial() } +// NewWebhookSinkEnabled determines whether or not the refactored Webhook sink +// or the deprecated sink should be used. +var NewWebhookSinkEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.new_webhook_sink_enabled", + "if enabled, this setting enables a new implementation of the webhook sink"+ + " that allows for a much higher throughput", + util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false), +) + func getSink( ctx context.Context, serverCfg *execinfra.ServerConfig, @@ -207,10 +227,17 @@ func getSink( if err != nil { return nil, err } - return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { - return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, - defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder) - }) + if NewWebhookSinkEnabled.Get(&serverCfg.Settings.SV) { + return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { + return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, + numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder) + }) + } else { + return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { + return makeDeprecatedWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts, + defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder) + }) + } case isPubsubSink(u): // TODO: add metrics to pubsubsink return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered)) @@ -606,12 +633,10 @@ func (n *nullSink) Dial() error { } // safeSink wraps an EventSink in a mutex so it's methods are -// thread safe. It also has a beforeFlush hook which is called -// at the beginning of safeSink.Flush(). +// thread safe. type safeSink struct { syncutil.Mutex - beforeFlush func(ctx context.Context) error - wrapped EventSink + wrapped EventSink } var _ EventSink = (*safeSink)(nil) @@ -645,9 +670,6 @@ func (s *safeSink) EmitRow( } func (s *safeSink) Flush(ctx context.Context) error { - if err := s.beforeFlush(ctx); err != nil { - return err - } s.Lock() defer s.Unlock() return s.wrapped.Flush(ctx) @@ -673,3 +695,161 @@ type SinkWithEncoder interface { Flush(ctx context.Context) error } + +// proper JSON schema for sink config: +// +// { +// "Flush": { +// "Messages": ..., +// "Bytes": ..., +// "Frequency": ..., +// }, +// "Retry": { +// "Max": ..., +// "Backoff": ..., +// } +// } +type sinkJSONConfig struct { + Flush sinkBatchConfig `json:",omitempty"` + Retry sinkRetryConfig `json:",omitempty"` +} + +type sinkBatchConfig struct { + Bytes, Messages int `json:",omitempty"` + Frequency jsonDuration `json:",omitempty"` +} + +// wrapper structs to unmarshal json, retry.Options will be the actual config +type sinkRetryConfig struct { + Max jsonMaxRetries `json:",omitempty"` + Backoff jsonDuration `json:",omitempty"` +} + +func defaultRetryConfig() retry.Options { + opts := retry.Options{ + InitialBackoff: 500 * time.Millisecond, + MaxRetries: 3, + Multiplier: 2, + } + // max backoff should be initial * 2 ^ maxRetries + opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries)))) + return opts +} + +func getSinkConfigFromJson( + jsonStr changefeedbase.SinkSpecificJSONConfig, +) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) { + retryCfg = defaultRetryConfig() + + var cfg = sinkJSONConfig{} + cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries) + cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff) + if jsonStr != `` { + // set retry defaults to be overridden if included in JSON + if err = json.Unmarshal([]byte(jsonStr), &cfg); err != nil { + return batchCfg, retryCfg, errors.Wrapf(err, "error unmarshalling json") + } + } + + // don't support negative values + if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 || + cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 { + return batchCfg, retryCfg, errors.Errorf("invalid sink config, all values must be non-negative") + } + + // errors if other batch values are set, but frequency is not + if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 { + return batchCfg, retryCfg, errors.Errorf("invalid sink config, Flush.Frequency is not set, messages may never be sent") + } + + retryCfg.MaxRetries = int(cfg.Retry.Max) + retryCfg.InitialBackoff = time.Duration(cfg.Retry.Backoff) + return cfg.Flush, retryCfg, nil +} + +type jsonMaxRetries int + +func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error { + var i int64 + // try to parse as int + i, err := strconv.ParseInt(string(b), 10, 64) + if err == nil { + if i <= 0 { + return errors.Errorf("Retry.Max must be a positive integer. use 'inf' for infinite retries.") + } + *j = jsonMaxRetries(i) + } else { + // if that fails, try to parse as string (only accept 'inf') + var s string + // using unmarshal here to remove quotes around the string + if err := json.Unmarshal(b, &s); err != nil { + return errors.Errorf("Retry.Max must be either a positive int or 'inf' for infinite retries.") + } + if strings.ToLower(s) == "inf" { + // if used wants infinite retries, set to zero as retry.Options interprets this as infinity + *j = 0 + } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings + *j = jsonMaxRetries(n) + } else { + return errors.Errorf("Retry.Max must be either a positive int or 'inf' for infinite retries.") + } + } + return nil +} + +func numSinkIOWorkers(cfg *execinfra.ServerConfig) int { + numWorkers := changefeedbase.SinkIOWorkers.Get(&cfg.Settings.SV) + if numWorkers > 0 { + return int(numWorkers) + } + + idealNumber := runtime.GOMAXPROCS(0) + if idealNumber < 1 { + return 1 + } + if idealNumber > 32 { + return 32 + } + return idealNumber +} + +func newCPUPacerFactory(ctx context.Context, cfg *execinfra.ServerConfig) func() *admission.Pacer { + var prevPacer *admission.Pacer + var prevRequestUnit time.Duration + + return func() *admission.Pacer { + pacerRequestUnit := changefeedbase.SinkPacerRequestSize.Get(&cfg.Settings.SV) + enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV) + + if enablePacer && prevPacer != nil && prevRequestUnit == pacerRequestUnit { + return prevPacer + } + + var pacer *admission.Pacer = nil + if enablePacer { + tenantID, ok := roachpb.ClientTenantFromContext(ctx) + if !ok { + tenantID = roachpb.SystemTenantID + } + + pacer = cfg.AdmissionPacerFactory.NewPacer( + pacerRequestUnit, + admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.BulkNormalPri, + CreateTime: timeutil.Now().UnixNano(), + BypassAdmission: false, + }, + ) + } + + prevPacer = pacer + prevRequestUnit = pacerRequestUnit + + return pacer + } +} + +func nilPacerFactory() *admission.Pacer { + return nil +} diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index eb663f021efc..25ae999c29ea 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -764,3 +764,86 @@ func TestKafkaSinkTracksMemory(t *testing.T) { require.EqualValues(t, 0, p.outstanding()) require.EqualValues(t, 0, pool.used()) } + +func TestSinkConfigParsing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + t.Run("handles valid types", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":30}, "Retry": {"Max": 5, "Backoff": "3h"}}`) + batch, retry, err := getSinkConfigFromJson(opts) + require.NoError(t, err) + require.Equal(t, batch, sinkBatchConfig{ + Bytes: 30, + Messages: 1234, + Frequency: jsonDuration(3 * time.Second), + }) + require.Equal(t, retry.MaxRetries, 5) + require.Equal(t, retry.InitialBackoff, 3*time.Hour) + + // Max accepts both values and specifically the string "inf" + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`) + _, retry, err = getSinkConfigFromJson(opts) + require.NoError(t, err) + require.Equal(t, retry.MaxRetries, 0) + }) + + t.Run("provides retry defaults", func(t *testing.T) { + defaultRetry := defaultRetryConfig() + + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s"}}`) + _, retry, err := getSinkConfigFromJson(opts) + require.NoError(t, err) + + require.Equal(t, retry.MaxRetries, defaultRetry.MaxRetries) + require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff) + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "inf"}}`) + _, retry, err = getSinkConfigFromJson(opts) + require.NoError(t, err) + require.Equal(t, retry.MaxRetries, 0) + require.Equal(t, retry.InitialBackoff, defaultRetry.InitialBackoff) + }) + + t.Run("errors on invalid configuration", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": -1234, "Frequency": "3s"}}`) + _, _, err := getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid sink config, all values must be non-negative") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "-3s"}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid sink config, all values must be non-negative") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 10}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid sink config, Flush.Frequency is not set, messages may never be sent") + }) + + t.Run("errors on invalid type", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": "1234", "Frequency": "3s", "Bytes":30}}`) + _, _, err := getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Flush.Messages of type int") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234, "Frequency": "3s", "Bytes":"30"}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Flush.Bytes of type int") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": true}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'") + + opts = changefeedbase.SinkSpecificJSONConfig(`{"Retry": {"Max": "not-inf"}}`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "Retry.Max must be either a positive int or 'inf'") + }) + + t.Run("errors on malformed json", func(t *testing.T) { + opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1234 "Frequency": "3s"}}`) + _, _, err := getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid character '\"'") + + opts = changefeedbase.SinkSpecificJSONConfig(`string`) + _, _, err = getSinkConfigFromJson(opts) + require.ErrorContains(t, err, "invalid character 's' looking for beginning of value") + }) +} diff --git a/pkg/ccl/changefeedccl/sink_webhook.go b/pkg/ccl/changefeedccl/sink_webhook.go index 09fd6cb508a2..f2ca73d125de 100644 --- a/pkg/ccl/changefeedccl/sink_webhook.go +++ b/pkg/ccl/changefeedccl/sink_webhook.go @@ -17,11 +17,9 @@ import ( "fmt" "hash/crc32" "io" - "math" "net" "net/http" "net/url" - "strconv" "strings" "time" @@ -36,23 +34,7 @@ import ( "github.com/cockroachdb/errors" ) -const ( - applicationTypeJSON = `application/json` - applicationTypeCSV = `text/csv` - authorizationHeader = `Authorization` -) - -func isWebhookSink(u *url.URL) bool { - switch u.Scheme { - // allow HTTP here but throw an error later to make it clear HTTPS is required - case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: - return true - default: - return false - } -} - -type webhookSink struct { +type deprecatedWebhookSink struct { // Webhook configuration. parallelism int retryCfg retry.Options @@ -80,11 +62,11 @@ type webhookSink struct { workerCtx context.Context workerGroup ctxgroup.Group exitWorkers func() // Signaled to shut down all workers. - eventsChans []chan []messagePayload + eventsChans []chan []deprecatedMessagePayload metrics metricsRecorder } -func (s *webhookSink) getConcreteType() sinkType { +func (s *deprecatedWebhookSink) getConcreteType() sinkType { return sinkTypeWebhook } @@ -100,7 +82,7 @@ type encodedPayload struct { mvcc hlc.Timestamp } -func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadJSONWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -129,7 +111,7 @@ func encodePayloadJSONWebhook(messages []messagePayload) (encodedPayload, error) return result, err } -func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) { +func encodePayloadCSVWebhook(messages []deprecatedMessagePayload) (encodedPayload, error) { result := encodedPayload{ emitTime: timeutil.Now(), } @@ -150,7 +132,7 @@ func encodePayloadCSVWebhook(messages []messagePayload) (encodedPayload, error) return result, nil } -type messagePayload struct { +type deprecatedMessagePayload struct { // Payload message fields. key []byte val []byte @@ -162,15 +144,15 @@ type messagePayload struct { // webhookMessage contains either messagePayload or a flush request. type webhookMessage struct { flushDone *chan struct{} - payload messagePayload + payload deprecatedMessagePayload } type batch struct { - buffer []messagePayload + buffer []deprecatedMessagePayload bufferBytes int } -func (b *batch) addToBuffer(m messagePayload) { +func (b *batch) addToBuffer(m deprecatedMessagePayload) { b.bufferBytes += len(m.val) b.buffer = append(b.buffer, m) } @@ -185,36 +167,6 @@ type batchConfig struct { Frequency jsonDuration `json:",omitempty"` } -type jsonMaxRetries int - -func (j *jsonMaxRetries) UnmarshalJSON(b []byte) error { - var i int64 - // try to parse as int - i, err := strconv.ParseInt(string(b), 10, 64) - if err == nil { - if i <= 0 { - return errors.Errorf("max retry count must be a positive integer. use 'inf' for infinite retries.") - } - *j = jsonMaxRetries(i) - } else { - // if that fails, try to parse as string (only accept 'inf') - var s string - // using unmarshal here to remove quotes around the string - if err := json.Unmarshal(b, &s); err != nil { - return err - } - if strings.ToLower(s) == "inf" { - // if used wants infinite retries, set to zero as retry.Options interprets this as infinity - *j = 0 - } else if n, err := strconv.Atoi(s); err == nil { // also accept ints as strings - *j = jsonMaxRetries(n) - } else { - return errors.Errorf("max retries must be either a positive int or 'inf' for infinite retries.") - } - } - return nil -} - // wrapper structs to unmarshal json, retry.Options will be the actual config type retryConfig struct { Max jsonMaxRetries `json:",omitempty"` @@ -239,7 +191,7 @@ type webhookSinkConfig struct { Retry retryConfig `json:",omitempty"` } -func (s *webhookSink) getWebhookSinkConfig( +func (s *deprecatedWebhookSink) getWebhookSinkConfig( jsonStr changefeedbase.SinkSpecificJSONConfig, ) (batchCfg batchConfig, retryCfg retry.Options, err error) { retryCfg = defaultRetryConfig() @@ -257,12 +209,12 @@ func (s *webhookSink) getWebhookSinkConfig( // don't support negative values if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 || cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 { - return batchCfg, retryCfg, errors.Errorf("invalid option value %s, all config values must be non-negative", changefeedbase.OptWebhookSinkConfig) + return batchCfg, retryCfg, errors.Errorf("invalid sink config, all values must be non-negative") } // errors if other batch values are set, but frequency is not if (cfg.Flush.Messages > 0 || cfg.Flush.Bytes > 0) && cfg.Flush.Frequency == 0 { - return batchCfg, retryCfg, errors.Errorf("invalid option value %s, flush frequency is not set, messages may never be sent", changefeedbase.OptWebhookSinkConfig) + return batchCfg, retryCfg, errors.Errorf("invalid sink config, Flush.Frequency is not set, messages may never be sent") } retryCfg.MaxRetries = int(cfg.Retry.Max) @@ -270,7 +222,7 @@ func (s *webhookSink) getWebhookSinkConfig( return cfg.Flush, retryCfg, nil } -func makeWebhookSink( +func makeDeprecatedWebhookSink( ctx context.Context, u sinkURL, encodingOpts changefeedbase.EncodingOptions, @@ -312,7 +264,7 @@ func makeWebhookSink( ctx, cancel := context.WithCancel(ctx) - sink := &webhookSink{ + sink := &deprecatedWebhookSink{ workerCtx: ctx, authHeader: opts.AuthHeader, exitWorkers: cancel, @@ -329,7 +281,7 @@ func makeWebhookSink( } // TODO(yevgeniy): Establish HTTP connection in Dial(). - sink.client, err = makeWebhookClient(u, connTimeout) + sink.client, err = deprecatedMakeWebhookClient(u, connTimeout) if err != nil { return nil, err } @@ -350,7 +302,7 @@ func makeWebhookSink( return sink, nil } -func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { +func deprecatedMakeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, error) { client := &httputil.Client{ Client: &http.Client{ Timeout: timeout, @@ -417,30 +369,19 @@ func makeWebhookClient(u sinkURL, timeout time.Duration) (*httputil.Client, erro return client, nil } -func defaultRetryConfig() retry.Options { - opts := retry.Options{ - InitialBackoff: 500 * time.Millisecond, - MaxRetries: 3, - Multiplier: 2, - } - // max backoff should be initial * 2 ^ maxRetries - opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries)))) - return opts -} - // defaultWorkerCount() is the number of CPU's on the machine func defaultWorkerCount() int { return system.NumCPU() } -func (s *webhookSink) Dial() error { +func (s *deprecatedWebhookSink) Dial() error { s.setupWorkers() return nil } -func (s *webhookSink) setupWorkers() { +func (s *deprecatedWebhookSink) setupWorkers() { // setup events channels to send to workers and the worker group - s.eventsChans = make([]chan []messagePayload, s.parallelism) + s.eventsChans = make([]chan []deprecatedMessagePayload, s.parallelism) s.workerGroup = ctxgroup.WithContext(s.workerCtx) s.batchChan = make(chan webhookMessage) @@ -455,7 +396,7 @@ func (s *webhookSink) setupWorkers() { return nil }) for i := 0; i < s.parallelism; i++ { - s.eventsChans[i] = make(chan []messagePayload) + s.eventsChans[i] = make(chan []deprecatedMessagePayload) j := i s.workerGroup.GoCtx(func(ctx context.Context) error { s.workerLoop(j) @@ -464,7 +405,7 @@ func (s *webhookSink) setupWorkers() { } } -func (s *webhookSink) shouldSendBatch(b batch) bool { +func (s *deprecatedWebhookSink) shouldSendBatch(b batch) bool { // similar to sarama, send batch if: // everything is zero (default) // any one of the conditions are met UNLESS the condition is zero which means never batch @@ -483,8 +424,8 @@ func (s *webhookSink) shouldSendBatch(b batch) bool { } } -func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error { - workerBatches := make([][]messagePayload, s.parallelism) +func (s *deprecatedWebhookSink) splitAndSendBatch(batch []deprecatedMessagePayload) error { + workerBatches := make([][]deprecatedMessagePayload, s.parallelism) for _, msg := range batch { // split batch into per-worker batches i := s.workerIndex(msg.key) @@ -504,7 +445,7 @@ func (s *webhookSink) splitAndSendBatch(batch []messagePayload) error { } // flushWorkers sends flush request to each worker and waits for each one to acknowledge. -func (s *webhookSink) flushWorkers(done chan struct{}) error { +func (s *deprecatedWebhookSink) flushWorkers(done chan struct{}) error { for i := 0; i < len(s.eventsChans); i++ { // Ability to write a nil message to events channel indicates that // the worker has processed all other messages. @@ -525,7 +466,7 @@ func (s *webhookSink) flushWorkers(done chan struct{}) error { // batchWorker ingests messages from EmitRow into a batch and splits them into // per-worker batches to be sent separately -func (s *webhookSink) batchWorker() { +func (s *deprecatedWebhookSink) batchWorker() { var batchTracker batch batchTimer := s.ts.NewTimer() defer batchTimer.Stop() @@ -578,7 +519,7 @@ func (s *webhookSink) batchWorker() { } } -func (s *webhookSink) workerLoop(workerIndex int) { +func (s *deprecatedWebhookSink) workerLoop(workerIndex int) { for { select { case <-s.workerCtx.Done(): @@ -613,14 +554,14 @@ func (s *webhookSink) workerLoop(workerIndex int) { } } -func (s *webhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessageWithRetries(ctx context.Context, reqBody []byte) error { requestFunc := func() error { return s.sendMessage(ctx, reqBody) } return retry.WithMaxAttempts(ctx, s.retryCfg, s.retryCfg.MaxRetries+1, requestFunc) } -func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { +func (s *deprecatedWebhookSink) sendMessage(ctx context.Context, reqBody []byte) error { req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url.String(), bytes.NewReader(reqBody)) if err != nil { return err @@ -658,13 +599,13 @@ func (s *webhookSink) sendMessage(ctx context.Context, reqBody []byte) error { // deterministically assigned to the same worker. Since we have a channel per // worker, we can ensure per-worker ordering and therefore guarantee per-key // ordering. -func (s *webhookSink) workerIndex(key []byte) uint32 { +func (s *deprecatedWebhookSink) workerIndex(key []byte) uint32 { return crc32.ChecksumIEEE(key) % uint32(s.parallelism) } // exitWorkersWithError saves the first error message encountered by webhook workers, // and requests all workers to terminate. -func (s *webhookSink) exitWorkersWithError(err error) { +func (s *deprecatedWebhookSink) exitWorkersWithError(err error) { // errChan has buffer size 1, first error will be saved to the buffer and // subsequent errors will be ignored select { @@ -675,7 +616,7 @@ func (s *webhookSink) exitWorkersWithError(err error) { } // sinkError checks to see if any errors occurred inside workers go routines. -func (s *webhookSink) sinkError() error { +func (s *deprecatedWebhookSink) sinkError() error { select { case err := <-s.errChan: return err @@ -684,7 +625,7 @@ func (s *webhookSink) sinkError() error { } } -func (s *webhookSink) EmitRow( +func (s *deprecatedWebhookSink) EmitRow( ctx context.Context, topic TopicDescriptor, key, value []byte, @@ -702,7 +643,7 @@ func (s *webhookSink) EmitRow( case err := <-s.errChan: return err case s.batchChan <- webhookMessage{ - payload: messagePayload{ + payload: deprecatedMessagePayload{ key: key, val: value, alloc: alloc, @@ -714,7 +655,7 @@ func (s *webhookSink) EmitRow( return nil } -func (s *webhookSink) EmitResolvedTimestamp( +func (s *deprecatedWebhookSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { defer s.metrics.recordResolvedCallback()() @@ -745,7 +686,7 @@ func (s *webhookSink) EmitResolvedTimestamp( return nil } -func (s *webhookSink) Flush(ctx context.Context) error { +func (s *deprecatedWebhookSink) Flush(ctx context.Context) error { s.metrics.recordFlushRequestCallback()() // Send flush request. @@ -768,7 +709,7 @@ func (s *webhookSink) Flush(ctx context.Context) error { } } -func (s *webhookSink) Close() error { +func (s *deprecatedWebhookSink) Close() error { s.exitWorkers() // ignore errors here since we're closing the sink anyway _ = s.workerGroup.Wait() diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 199734be6bf5..65356260c7e6 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -15,6 +15,7 @@ import ( "net/http" "net/url" "strings" + "sync/atomic" "testing" "time" @@ -83,7 +84,7 @@ func setupWebhookSinkWithDetails( if err != nil { return nil, err } - sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, source, nilMetricsRecorderBuilder) + sinkSrc, err := makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, sinkOpts, parallelism, nilPacerFactory, source, nilMetricsRecorderBuilder) if err != nil { return nil, err } @@ -172,8 +173,6 @@ func TestWebhookSink(t *testing.T) { require.NoError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.Regexp(t, "x509", sinkSrcNoCert.Flush(context.Background())) - require.EqualError(t, sinkSrcNoCert.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) params.Set(changefeedbase.SinkParamSkipTLSVerify, "true") sinkDestHost.RawQuery = params.Encode() @@ -191,8 +190,6 @@ func TestWebhookSink(t *testing.T) { err = sinkSrc.Flush(context.Background()) require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf(`Post "%s":`, sinkDest.URL())) - require.EqualError(t, sinkSrc.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) sinkDestHTTP, err := cdctest.StartMockWebhookSinkInsecure() require.NoError(t, err) @@ -207,8 +204,6 @@ func TestWebhookSink(t *testing.T) { require.EqualError(t, sinkSrcWrongProtocol.Flush(context.Background()), fmt.Sprintf(`Post "%s": http: server gave HTTP response to HTTPS client`, fmt.Sprintf("https://%s", strings.TrimPrefix(sinkDestHTTP.URL(), "http://")))) - require.EqualError(t, sinkSrcWrongProtocol.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) sinkDestSecure, err := cdctest.StartMockWebhookSinkSecure(cert) require.NoError(t, err) @@ -230,6 +225,7 @@ func TestWebhookSink(t *testing.T) { Opts: opts, } + require.NoError(t, sinkSrc.Close()) sinkSrc, err = setupWebhookSinkWithDetails(context.Background(), details, parallelism, timeutil.DefaultTimeSource{}) require.NoError(t, err) @@ -305,8 +301,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcNoCreds.Flush(context.Background()), "401 Unauthorized: ") - require.EqualError(t, sinkSrcNoCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) // wrong credentials should result in a 401 as well var wrongAuthHeader string @@ -318,8 +312,6 @@ func TestWebhookSinkWithAuthOptions(t *testing.T) { require.NoError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, zeroAlloc)) require.EqualError(t, sinkSrcWrongCreds.Flush(context.Background()), "401 Unauthorized: ") - require.EqualError(t, sinkSrcWrongCreds.EmitRow(context.Background(), nil, nil, nil, zeroTS, zeroTS, zeroAlloc), - `context canceled`) require.NoError(t, sinkSrc.Close()) require.NoError(t, sinkSrcNoCreds.Close()) @@ -603,6 +595,13 @@ func TestWebhookSinkConfig(t *testing.T) { sinkSrc, err := setupWebhookSinkWithDetails(context.Background(), details, parallelism, mt) require.NoError(t, err) + batchingSink, ok := sinkSrc.(*batchingSink) + require.True(t, ok) + var appendCount int32 = 0 + batchingSink.knobs.OnAppend = func(event *rowEvent) { + atomic.AddInt32(&appendCount, 1) + } + // send incomplete batch require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1000},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) require.NoError(t, sinkSrc.EmitRow(context.Background(), nil, []byte("[1001]"), []byte("{\"after\":{\"col1\":\"val1\",\"rowid\":1001},\"key\":[1001],\"topic:\":\"foo\"}"), zeroTS, zeroTS, pool.alloc())) @@ -611,11 +610,10 @@ func TestWebhookSinkConfig(t *testing.T) { require.Equal(t, sinkDest.Latest(), "") testutils.SucceedsSoon(t, func() error { - // wait for the timer in batch worker to be set (1 hour from now, as specified by config) before advancing time. - if len(mt.Timers()) == 1 && mt.Timers()[0] == mt.Now().Add(time.Hour) { + if atomic.LoadInt32(&appendCount) >= 2 { return nil } - return errors.New("Waiting for timer to be created by batch worker") + return errors.New("Waiting for rows to be buffered") }) mt.Advance(time.Hour) require.NoError(t, sinkSrc.Flush(context.Background())) diff --git a/pkg/ccl/changefeedccl/sink_webhook_v2.go b/pkg/ccl/changefeedccl/sink_webhook_v2.go new file mode 100644 index 000000000000..6403c67c7a48 --- /dev/null +++ b/pkg/ccl/changefeedccl/sink_webhook_v2.go @@ -0,0 +1,381 @@ +// Copyright 2023 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/httputil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +const ( + applicationTypeJSON = `application/json` + applicationTypeCSV = `text/csv` + authorizationHeader = `Authorization` +) + +func isWebhookSink(u *url.URL) bool { + switch u.Scheme { + // allow HTTP here but throw an error later to make it clear HTTPS is required + case changefeedbase.SinkSchemeWebhookHTTP, changefeedbase.SinkSchemeWebhookHTTPS: + return true + default: + return false + } +} + +type webhookSinkClient struct { + ctx context.Context + format changefeedbase.FormatType + url sinkURL + authHeader string + batchCfg sinkBatchConfig + client *httputil.Client +} + +var _ SinkClient = (*webhookSinkClient)(nil) + +func makeWebhookSinkClient( + ctx context.Context, + u sinkURL, + encodingOpts changefeedbase.EncodingOptions, + opts changefeedbase.WebhookSinkOptions, + batchCfg sinkBatchConfig, + parallelism int, +) (SinkClient, error) { + err := validateWebhookOpts(u, encodingOpts, opts) + if err != nil { + return nil, err + } + + u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`) + + sinkClient := &webhookSinkClient{ + ctx: ctx, + authHeader: opts.AuthHeader, + format: encodingOpts.Format, + batchCfg: batchCfg, + } + + var connTimeout time.Duration + if opts.ClientTimeout != nil { + connTimeout = *opts.ClientTimeout + } + sinkClient.client, err = makeWebhookClient(u, connTimeout, parallelism) + if err != nil { + return nil, err + } + + // remove known query params from sink URL before setting in sink config + sinkURLParsed, err := url.Parse(u.String()) + if err != nil { + return nil, err + } + params := sinkURLParsed.Query() + params.Del(changefeedbase.SinkParamSkipTLSVerify) + params.Del(changefeedbase.SinkParamCACert) + params.Del(changefeedbase.SinkParamClientCert) + params.Del(changefeedbase.SinkParamClientKey) + sinkURLParsed.RawQuery = params.Encode() + sinkClient.url = sinkURL{URL: sinkURLParsed} + + return sinkClient, nil +} + +func makeWebhookClient( + u sinkURL, timeout time.Duration, parallelism int, +) (*httputil.Client, error) { + client := &httputil.Client{ + Client: &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + DialContext: (&net.Dialer{Timeout: timeout}).DialContext, + MaxConnsPerHost: parallelism, + MaxIdleConnsPerHost: parallelism, + IdleConnTimeout: time.Minute, + ForceAttemptHTTP2: true, + }, + }, + } + + dialConfig := struct { + tlsSkipVerify bool + caCert []byte + clientCert []byte + clientKey []byte + }{} + + transport := client.Transport.(*http.Transport) + + if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil { + return nil, err + } + if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil { + return nil, err + } + + transport.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: dialConfig.tlsSkipVerify, + } + + if dialConfig.caCert != nil { + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, errors.Wrap(err, "could not load system root CA pool") + } + if caCertPool == nil { + caCertPool = x509.NewCertPool() + } + if !caCertPool.AppendCertsFromPEM(dialConfig.caCert) { + return nil, errors.Errorf("failed to parse certificate data:%s", string(dialConfig.caCert)) + } + transport.TLSClientConfig.RootCAs = caCertPool + } + + if dialConfig.clientCert != nil && dialConfig.clientKey == nil { + return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientCert, changefeedbase.SinkParamClientKey) + } else if dialConfig.clientKey != nil && dialConfig.clientCert == nil { + return nil, errors.Errorf(`%s requires %s to be set`, changefeedbase.SinkParamClientKey, changefeedbase.SinkParamClientCert) + } + + if dialConfig.clientCert != nil && dialConfig.clientKey != nil { + cert, err := tls.X509KeyPair(dialConfig.clientCert, dialConfig.clientKey) + if err != nil { + return nil, errors.Wrap(err, `invalid client certificate data provided`) + } + transport.TLSClientConfig.Certificates = []tls.Certificate{cert} + } + + return client, nil +} + +func (sc *webhookSinkClient) makePayloadForBytes(body []byte) (SinkPayload, error) { + req, err := http.NewRequestWithContext(sc.ctx, http.MethodPost, sc.url.String(), bytes.NewReader(body)) + if err != nil { + return nil, err + } + switch sc.format { + case changefeedbase.OptFormatJSON: + req.Header.Set("Content-Type", applicationTypeJSON) + case changefeedbase.OptFormatCSV: + req.Header.Set("Content-Type", applicationTypeCSV) + } + + if sc.authHeader != "" { + req.Header.Set(authorizationHeader, sc.authHeader) + } + + return req, nil +} + +// MakeResolvedPayload implements the SinkClient interface +func (sc *webhookSinkClient) MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) { + return sc.makePayloadForBytes(body) +} + +// Flush implements the SinkClient interface +func (sc *webhookSinkClient) Flush(ctx context.Context, batch SinkPayload) error { + req := batch.(*http.Request) + res, err := sc.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if !(res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices) { + resBody, err := io.ReadAll(res.Body) + if err != nil { + return errors.Wrapf(err, "failed to read body for HTTP response with status: %d", res.StatusCode) + } + return fmt.Errorf("%s: %s", res.Status, string(resBody)) + } + return nil +} + +// Close implements the SinkClient interface +func (sc *webhookSinkClient) Close() error { + sc.client.CloseIdleConnections() + return nil +} + +func validateWebhookOpts( + u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions, +) error { + if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS { + return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS) + } + + switch encodingOpts.Format { + case changefeedbase.OptFormatJSON: + case changefeedbase.OptFormatCSV: + default: + return errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptFormat, encodingOpts.Format) + } + + switch encodingOpts.Envelope { + case changefeedbase.OptEnvelopeWrapped, changefeedbase.OptEnvelopeBare: + default: + return errors.Errorf(`this sink is incompatible with %s=%s`, + changefeedbase.OptEnvelope, encodingOpts.Envelope) + } + + encodingOpts.TopicInValue = true + + if encodingOpts.Envelope != changefeedbase.OptEnvelopeBare { + encodingOpts.KeyInValue = true + } + + return nil +} + +type webhookCSVBuffer struct { + bytes []byte + messageCount int + sc *webhookSinkClient +} + +var _ BatchBuffer = (*webhookCSVBuffer)(nil) + +// Append implements the BatchBuffer interface +func (cb *webhookCSVBuffer) Append(key []byte, value []byte, topic string) { + cb.bytes = append(cb.bytes, value...) + cb.messageCount += 1 +} + +// ShouldFlush implements the BatchBuffer interface +func (cb *webhookCSVBuffer) ShouldFlush() bool { + return cb.sc.shouldFlush(len(cb.bytes), cb.messageCount) +} + +// Close implements the BatchBuffer interface +func (cb *webhookCSVBuffer) Close() (SinkPayload, error) { + return cb.sc.makePayloadForBytes(cb.bytes) +} + +type webhookJSONBuffer struct { + messages [][]byte + numBytes int + sc *webhookSinkClient +} + +var _ BatchBuffer = (*webhookJSONBuffer)(nil) + +// Append implements the BatchBuffer interface +func (jb *webhookJSONBuffer) Append(key []byte, value []byte, topic string) { + jb.messages = append(jb.messages, value) + jb.numBytes += len(value) +} + +// ShouldFlush implements the BatchBuffer interface +func (jb *webhookJSONBuffer) ShouldFlush() bool { + return jb.sc.shouldFlush(jb.numBytes, len(jb.messages)) +} + +// Close implements the BatchBuffer interface +func (jb *webhookJSONBuffer) Close() (SinkPayload, error) { + var buffer bytes.Buffer + prefix := "{\"payload\":[" + suffix := fmt.Sprintf("],\"length\":%d}", len(jb.messages)) + + // Grow all at once to avoid reallocations + buffer.Grow(len(prefix) + jb.numBytes /* msgs */ + len(jb.messages) /* commas */ + len(suffix)) + + buffer.WriteString(prefix) + for i, msg := range jb.messages { + if i != 0 { + buffer.WriteByte(',') + } + buffer.Write(msg) + } + buffer.WriteString(suffix) + return jb.sc.makePayloadForBytes(buffer.Bytes()) +} + +// MakeBatchBuffer implements the SinkClient interface +func (sc *webhookSinkClient) MakeBatchBuffer() BatchBuffer { + if sc.format == changefeedbase.OptFormatCSV { + return &webhookCSVBuffer{sc: sc} + } else { + return &webhookJSONBuffer{ + sc: sc, + messages: make([][]byte, 0, sc.batchCfg.Messages), + } + } +} + +func (sc *webhookSinkClient) shouldFlush(bytes int, messages int) bool { + switch { + // all zero values is interpreted as flush every time + case sc.batchCfg.Messages == 0 && sc.batchCfg.Bytes == 0 && sc.batchCfg.Frequency == 0: + return true + // messages threshold has been reached + case sc.batchCfg.Messages > 0 && messages >= sc.batchCfg.Messages: + return true + // bytes threshold has been reached + case sc.batchCfg.Bytes > 0 && bytes >= sc.batchCfg.Bytes: + return true + default: + return false + } +} + +func makeWebhookSink( + ctx context.Context, + u sinkURL, + encodingOpts changefeedbase.EncodingOptions, + opts changefeedbase.WebhookSinkOptions, + parallelism int, + pacerFactory func() *admission.Pacer, + source timeutil.TimeSource, + mb metricsRecorderBuilder, +) (Sink, error) { + batchCfg, retryOpts, err := getSinkConfigFromJson(opts.JSONConfig) + if err != nil { + return nil, err + } + + sinkClient, err := makeWebhookSinkClient(ctx, u, encodingOpts, opts, batchCfg, parallelism) + if err != nil { + return nil, err + } + + return makeBatchingSink( + ctx, + sinkTypeWebhook, + sinkClient, + time.Duration(batchCfg.Frequency), + retryOpts, + parallelism, + nil, + pacerFactory, + source, + mb(requiresResourceAccounting), + ), nil +} diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 1788b8789069..906b634a29d9 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -136,6 +136,8 @@ type telemetryMetricsRecorder struct { inner metricsRecorder } +var _ metricsRecorder = (*telemetryMetricsRecorder)(nil) + func (r *telemetryMetricsRecorder) close() { r.telemetryLogger.close() } @@ -184,6 +186,14 @@ func (r *telemetryMetricsRecorder) recordSizeBasedFlush() { r.inner.recordSizeBasedFlush() } +func (r *telemetryMetricsRecorder) recordParallelIOQueueLatency(latency time.Duration) { + r.inner.recordParallelIOQueueLatency(latency) +} + +func (r *telemetryMetricsRecorder) recordSinkIOInflightChange(delta int64) { + r.inner.recordSinkIOInflightChange(delta) +} + // ContinuousTelemetryInterval determines the interval at which each node emits telemetry events // during the lifespan of each enterprise changefeed. var ContinuousTelemetryInterval = settings.RegisterDurationSetting( diff --git a/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json b/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json index 8b6d460720d4..15ab4bfab8ad 100644 --- a/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json +++ b/pkg/cmd/roachtest/grafana/configs/changefeed-roachtest-grafana-dashboard.json @@ -1141,7 +1141,7 @@ "h": 7, "w": 6, "x": 0, - "y": 24 + "y": 40 }, "id": 18, "options": { @@ -1260,7 +1260,7 @@ "h": 7, "w": 6, "x": 6, - "y": 24 + "y": 40 }, "id": 57, "options": { @@ -1353,7 +1353,7 @@ "h": 7, "w": 5, "x": 12, - "y": 24 + "y": 40 }, "id": 72, "options": { @@ -1459,7 +1459,7 @@ "h": 7, "w": 6, "x": 0, - "y": 27 + "y": 43 }, "id": 35, "options": { @@ -1564,7 +1564,7 @@ "h": 7, "w": 6, "x": 6, - "y": 27 + "y": 43 }, "id": 33, "options": { @@ -1657,7 +1657,7 @@ "h": 7, "w": 6, "x": 12, - "y": 27 + "y": 43 }, "id": 39, "options": { @@ -1748,8 +1748,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1765,7 +1764,7 @@ "h": 6, "w": 4, "x": 0, - "y": 20 + "y": 36 }, "id": 63, "options": { @@ -1868,8 +1867,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1885,7 +1883,7 @@ "h": 6, "w": 4, "x": 4, - "y": 20 + "y": 36 }, "id": 64, "options": { @@ -1988,8 +1986,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2005,7 +2002,7 @@ "h": 6, "w": 4, "x": 8, - "y": 20 + "y": 36 }, "id": 65, "options": { @@ -2084,8 +2081,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2101,7 +2097,7 @@ "h": 6, "w": 4, "x": 12, - "y": 20 + "y": 36 }, "id": 75, "options": { @@ -2179,8 +2175,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2196,7 +2191,7 @@ "h": 6, "w": 4, "x": 16, - "y": 20 + "y": 36 }, "id": 19, "options": { @@ -2233,7 +2228,7 @@ "type": "row" }, { - "collapsed": true, + "collapsed": false, "gridPos": { "h": 1, "w": 24, @@ -2241,474 +2236,660 @@ "y": 20 }, "id": 14, - "panels": [ - { - "datasource": { - "type": "prometheus", - "uid": "localprom" + "panels": [], + "title": "Sink", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "ns" + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 6, - "x": 0, - "y": 21 - }, - "id": 54, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - "tooltip": { - "mode": "single", - "sort": "none" + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "pluginVersion": "9.2.3", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.50, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))", - "hide": false, - "legendFormat": "p50 {{label_name}}", - "range": true, - "refId": "C" - }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.75, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))", - "hide": false, - "legendFormat": "p75 {{label_name}}", - "range": true, - "refId": "D" - }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null }, - "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))", - "hide": false, - "legendFormat": "p95 {{label_name}}", - "range": true, - "refId": "E" - } - ], - "title": "Sink Buffer Time Batched", - "type": "timeseries" + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ns" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 0, + "y": 21 + }, + "id": 54, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "9.2.3", + "targets": [ { "datasource": { "type": "prometheus", "uid": "localprom" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "ns" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 5, - "x": 6, - "y": 21 + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))", + "hide": false, + "legendFormat": "p50 {{label_name}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" }, - "id": 55, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))", + "hide": false, + "legendFormat": "p75 {{label_name}}", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(changefeed_sink_batch_hist_nanos_bucket[$__rate_interval])) by (le))", + "hide": false, + "legendFormat": "p95 {{label_name}}", + "range": true, + "refId": "E" + } + ], + "title": "Sink Buffer Time Batched", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "tooltip": { - "mode": "single", - "sort": "none" + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } }, - "pluginVersion": "9.2.3", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "localprom" + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null }, - "editorMode": "code", - "expr": "histogram_quantile(0.50, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))", - "hide": false, - "legendFormat": "p50 {{label_name}}", - "range": true, - "refId": "C" + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ns" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 5, + "x": 6, + "y": 21 + }, + "id": 55, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "9.2.3", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))", + "hide": false, + "legendFormat": "p50 {{label_name}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))", + "hide": false, + "legendFormat": "p75 {{label_name}}", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))", + "hide": false, + "legendFormat": "p95 {{label_name}}", + "range": true, + "refId": "E" + } + ], + "title": "Sink Flush Time", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.75, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))", - "hide": false, - "legendFormat": "p75 {{label_name}}", - "range": true, - "refId": "D" + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(changefeed_flush_hist_nanos_bucket[$__rate_interval])) by (le))", - "hide": false, - "legendFormat": "p95 {{label_name}}", - "range": true, - "refId": "E" + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" } - ], - "title": "Sink Flush Time", - "type": "timeseries" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "cps" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 5, + "x": 11, + "y": 21 + }, + "id": 60, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": false }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "9.2.3", + "targets": [ { "datasource": { "type": "prometheus", "uid": "localprom" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "cps" + "editorMode": "code", + "expr": "sum(rate(changefeed_flushes[$__rate_interval]))", + "hide": false, + "legendFormat": "total_flushes", + "range": true, + "refId": "E" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "sum(rate(changefeed_size_based_flushes[$__rate_interval]))", + "hide": false, + "legendFormat": "size_based_flushes", + "range": true, + "refId": "A" + } + ], + "title": "Sink Flush Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "overrides": [] + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } }, - "gridPos": { - "h": 6, - "w": 5, - "x": 11, - "y": 21 + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] }, - "id": 60, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "unit": "ns" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 5, + "x": 16, + "y": 21 + }, + "id": 25, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "9.2.3", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.50, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))", + "hide": false, + "legendFormat": "p50 {{label_name}}", + "range": true, + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.75, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))", + "hide": false, + "legendFormat": "p75 {{label_name}}", + "range": true, + "refId": "D" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))", + "hide": false, + "legendFormat": "p95 {{label_name}}", + "range": true, + "refId": "E" + } + ], + "title": "Time Spent Checkpointing", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - "tooltip": { - "mode": "single", - "sort": "none" + "thresholdsStyle": { + "mode": "off" } }, - "pluginVersion": "9.2.3", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "sum(rate(changefeed_flushes[$__rate_interval]))", - "hide": false, - "legendFormat": "total_flushes", - "range": true, - "refId": "E" - }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null }, - "editorMode": "code", - "expr": "sum(rate(changefeed_size_based_flushes[$__rate_interval]))", - "hide": false, - "legendFormat": "size_based_flushes", - "range": true, - "refId": "A" - } - ], - "title": "Sink Flush Rate", - "type": "timeseries" + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 6, + "x": 0, + "y": 27 + }, + "id": 77, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ { "datasource": { "type": "prometheus", "uid": "localprom" }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "ns" - }, - "overrides": [] - }, - "gridPos": { - "h": 6, - "w": 5, - "x": 16, - "y": 21 + "editorMode": "code", + "expr": "sum by(scope) (rate(changefeed_sink_io_inflight{scope!=\"\"}[$__rate_interval]))", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Batches currently in IO", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" }, - "id": 25, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "9.2.3", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.50, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))", - "hide": false, - "legendFormat": "p50 {{label_name}}", - "range": true, - "refId": "C" + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.75, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))", - "hide": false, - "legendFormat": "p75 {{label_name}}", - "range": true, - "refId": "D" + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" }, - { - "datasource": { - "type": "prometheus", - "uid": "localprom" - }, - "editorMode": "code", - "expr": "histogram_quantile(0.95, sum(rate(changefeed_checkpoint_hist_nanos_bucket[5m])) by (le))", - "hide": false, - "legendFormat": "p95 {{label_name}}", - "range": true, - "refId": "E" + "thresholdsStyle": { + "mode": "off" } - ], - "title": "Time Spent Checkpointing", - "type": "timeseries" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ns" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 5, + "x": 6, + "y": 27 + }, + "id": 80, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "localprom" + }, + "editorMode": "code", + "expr": "sum by (scope) ((rate(changefeed_parallel_io_queue_nanos_sum{scope!=\"\"}[5m]) / rate(changefeed_parallel_io_queue_nanos_count{scope!=\"\"}[5m])))", + "legendFormat": "{{scope}}", + "range": true, + "refId": "A" } ], - "title": "Sink", - "type": "row" + "title": "Parallel IO Average Queue Time", + "type": "timeseries" }, { "collapsed": true, @@ -2716,7 +2897,7 @@ "h": 1, "w": 24, "x": 0, - "y": 21 + "y": 34 }, "id": 16, "panels": [ @@ -2766,8 +2947,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2783,7 +2963,7 @@ "h": 8, "w": 8, "x": 0, - "y": 22 + "y": 38 }, "id": 22, "options": { @@ -2860,8 +3040,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2877,7 +3056,7 @@ "h": 8, "w": 10, "x": 8, - "y": 22 + "y": 38 }, "id": 24, "options": { @@ -2954,8 +3133,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2971,7 +3149,7 @@ "h": 8, "w": 8, "x": 0, - "y": 30 + "y": 46 }, "id": 23, "options": { @@ -3048,8 +3226,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -3065,7 +3242,7 @@ "h": 8, "w": 5, "x": 8, - "y": 30 + "y": 46 }, "id": 50, "options": { @@ -3141,8 +3318,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -3158,7 +3334,7 @@ "h": 8, "w": 5, "x": 13, - "y": 30 + "y": 46 }, "id": 51, "options": { @@ -3200,7 +3376,7 @@ "h": 1, "w": 24, "x": 0, - "y": 22 + "y": 35 }, "id": 59, "panels": [], @@ -3213,7 +3389,7 @@ "h": 1, "w": 24, "x": 0, - "y": 23 + "y": 36 }, "id": 27, "panels": [ @@ -3262,8 +3438,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -3278,7 +3453,7 @@ "h": 7, "w": 8, "x": 0, - "y": 24 + "y": 40 }, "id": 31, "options": { diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index dd1dc9da998c..83c25cf37c0c 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1235,6 +1235,7 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) + // The deprecated webhook sink is unable to handle the throughput required for 100 warehouses if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil { ct.t.Fatal(err) } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 7c4174022d0a..684a93288cbf 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1570,6 +1570,7 @@ var charts = []sectionDescription{ "changefeed.checkpoint_hist_nanos", "changefeed.flush_hist_nanos", "changefeed.sink_batch_hist_nanos", + "changefeed.parallel_io_queue_nanos", }, }, { @@ -1594,6 +1595,12 @@ var charts = []sectionDescription{ "changefeed.flush.messages_pushback_nanos", }, }, + { + Title: "Sink IO", + Metrics: []string{ + "changefeed.sink_io_inflight", + }, + }, { Title: "Batching", Metrics: []string{