diff --git a/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel b/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel index b9ecdcb735c3..993c43479b4e 100644 --- a/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel @@ -9,7 +9,9 @@ go_library( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/settings", "//pkg/util/log", + "//pkg/util/metric", "//pkg/util/quotapool", + "//pkg/util/timeutil", "//pkg/util/tracing", ], ) diff --git a/pkg/ccl/changefeedccl/cdcutils/throttle.go b/pkg/ccl/changefeedccl/cdcutils/throttle.go index febb17a3b5c5..5b4ef59ce831 100644 --- a/pkg/ccl/changefeedccl/cdcutils/throttle.go +++ b/pkg/ccl/changefeedccl/cdcutils/throttle.go @@ -19,7 +19,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -29,6 +31,7 @@ type Throttler struct { messageLimiter *quotapool.RateLimiter byteLimiter *quotapool.RateLimiter flushLimiter *quotapool.RateLimiter + metrics *Metrics } // AcquireMessageQuota acquires quota for a message with the specified size. @@ -43,10 +46,10 @@ func (t *Throttler) AcquireMessageQuota(ctx context.Context, sz int) error { ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-%s", t.name)) defer span.Finish() - if err := t.messageLimiter.WaitN(ctx, 1); err != nil { + if err := waitQuota(ctx, 1, t.messageLimiter, t.metrics.MessagesPushbackNanos); err != nil { return err } - return t.byteLimiter.WaitN(ctx, int64(sz)) + return waitQuota(ctx, int64(sz), t.byteLimiter, t.metrics.BytesPushbackNanos) } // AcquireFlushQuota acquires quota for a message with the specified size. @@ -60,8 +63,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-flush-%s", t.name)) defer span.Finish() - - return t.flushLimiter.WaitN(ctx, 1) + return waitQuota(ctx, 1, t.flushLimiter, t.metrics.FlushPushbackNanos) } func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) { @@ -85,7 +87,7 @@ func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) { } // NewThrottler creates a new throttler with the specified configuration. -func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Throttler { +func NewThrottler(name string, config changefeedbase.SinkThrottleConfig, m *Metrics) *Throttler { logSlowAcquisition := quotapool.OnSlowAcquisition(500*time.Millisecond, quotapool.LogSlowAcquisition) t := &Throttler{ name: name, @@ -98,6 +100,7 @@ func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Thrott flushLimiter: quotapool.NewRateLimiter( fmt.Sprintf("%s-flushes", name), 0, 0, logSlowAcquisition, ), + metrics: m, } t.updateConfig(config) return t @@ -109,7 +112,7 @@ var nodeSinkThrottle = struct { }{} // NodeLevelThrottler returns node level Throttler for changefeeds. -func NodeLevelThrottler(sv *settings.Values) *Throttler { +func NodeLevelThrottler(sv *settings.Values, metrics *Metrics) *Throttler { getConfig := func() (config changefeedbase.SinkThrottleConfig) { configStr := changefeedbase.NodeSinkThrottleConfig.Get(sv) if configStr != "" { @@ -126,7 +129,7 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler { if nodeSinkThrottle.Throttler != nil { panic("unexpected state") } - nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig()) + nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig(), metrics) // Update node throttler configs when settings change. changefeedbase.NodeSinkThrottleConfig.SetOnChange(sv, func(ctx context.Context) { nodeSinkThrottle.Throttler.updateConfig(getConfig()) @@ -135,3 +138,43 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler { return nodeSinkThrottle.Throttler } + +// Metrics is a metric.Struct for kvfeed metrics. +type Metrics struct { + BytesPushbackNanos *metric.Counter + MessagesPushbackNanos *metric.Counter + FlushPushbackNanos *metric.Counter +} + +// MakeMetrics constructs a Metrics struct with the provided histogram window. +func MakeMetrics(histogramWindow time.Duration) Metrics { + makeMetric := func(n string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("changefeed.%s.messages_pushback_nanos", n), + Help: fmt.Sprintf("Total time spent throttled for %s quota", n), + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + } + + return Metrics{ + BytesPushbackNanos: metric.NewCounter(makeMetric("bytes")), + MessagesPushbackNanos: metric.NewCounter(makeMetric("messages")), + FlushPushbackNanos: metric.NewCounter(makeMetric("flush")), + } +} + +var _ metric.Struct = (*Metrics)(nil) + +// MetricStruct makes Metrics a metric.Struct. +func (m Metrics) MetricStruct() {} + +func waitQuota( + ctx context.Context, n int64, limit *quotapool.RateLimiter, c *metric.Counter, +) error { + start := timeutil.Now() + defer func() { + c.Inc(int64(timeutil.Now().Sub(start))) + }() + return limit.WaitN(ctx, n) +} diff --git a/pkg/ccl/changefeedccl/cdcutils/throttle_test.go b/pkg/ccl/changefeedccl/cdcutils/throttle_test.go index 0ef6769889ef..9a0517752e36 100644 --- a/pkg/ccl/changefeedccl/cdcutils/throttle_test.go +++ b/pkg/ccl/changefeedccl/cdcutils/throttle_test.go @@ -11,6 +11,7 @@ package cdcutils import ( "context" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -24,8 +25,8 @@ func TestNodeLevelThrottler(t *testing.T) { defer log.Scope(t).Close(t) sv := &cluster.MakeTestingClusterSettings().SV - - throttler := NodeLevelThrottler(sv) + m := MakeMetrics(time.Minute) + throttler := NodeLevelThrottler(sv, &m) // Default: no throttling require.True(t, throttler.messageLimiter.AdmitN(10000000)) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index b98a8aa799aa..ddb6c2061a2a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -281,7 +281,7 @@ func (ca *changeAggregator) startKVFeed( cfg := ca.flowCtx.Cfg buf := kvevent.NewThrottlingBuffer( kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics), - cdcutils.NodeLevelThrottler(&cfg.Settings.SV)) + cdcutils.NodeLevelThrottler(&cfg.Settings.SV, &ca.metrics.ThrottleMetrics)) // KVFeed takes ownership of the kvevent.Writer portion of the buffer, while // we return the kvevent.Reader part to the caller. diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 6f7857f339ab..9e18a2729376 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -12,6 +12,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -233,6 +234,7 @@ type Metrics struct { Running *metric.Gauge FrontierUpdates *metric.Counter + ThrottleMetrics cdcutils.Metrics mu struct { syncutil.Mutex @@ -270,7 +272,9 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { Running: metric.NewGauge(metaChangefeedRunning), FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), + ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow), } + m.mu.resolved = make(map[int]hlc.Timestamp) m.mu.id = 1 // start the first id at 1 so we can detect initialization m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {