Skip to content

Commit

Permalink
changefeedccl: Add metrics to changefeed throttle.
Browse files Browse the repository at this point in the history
Add metrics to changefeed traffic throttler.

Release Justification: Small observability changes to the existing functionality.
Release Notes: None
  • Loading branch information
Yevgeniy Miretskiy committed Sep 30, 2021
1 parent f4a1848 commit f2f824c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/cdcutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/settings",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/quotapool",
"//pkg/util/timeutil",
"//pkg/util/tracing",
],
)
Expand Down
57 changes: 50 additions & 7 deletions pkg/ccl/changefeedccl/cdcutils/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

Expand All @@ -29,6 +31,7 @@ type Throttler struct {
messageLimiter *quotapool.RateLimiter
byteLimiter *quotapool.RateLimiter
flushLimiter *quotapool.RateLimiter
metrics *Metrics
}

// AcquireMessageQuota acquires quota for a message with the specified size.
Expand All @@ -43,10 +46,10 @@ func (t *Throttler) AcquireMessageQuota(ctx context.Context, sz int) error {
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-%s", t.name))
defer span.Finish()

if err := t.messageLimiter.WaitN(ctx, 1); err != nil {
if err := waitQuota(ctx, 1, t.messageLimiter, t.metrics.MessagesPushbackNanos); err != nil {
return err
}
return t.byteLimiter.WaitN(ctx, int64(sz))
return waitQuota(ctx, int64(sz), t.byteLimiter, t.metrics.BytesPushbackNanos)
}

// AcquireFlushQuota acquires quota for a message with the specified size.
Expand All @@ -60,8 +63,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-flush-%s", t.name))
defer span.Finish()

return t.flushLimiter.WaitN(ctx, 1)
return waitQuota(ctx, 1, t.flushLimiter, t.metrics.FlushPushbackNanos)
}

func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
Expand All @@ -85,7 +87,7 @@ func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
}

// NewThrottler creates a new throttler with the specified configuration.
func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Throttler {
func NewThrottler(name string, config changefeedbase.SinkThrottleConfig, m *Metrics) *Throttler {
logSlowAcquisition := quotapool.OnSlowAcquisition(500*time.Millisecond, quotapool.LogSlowAcquisition)
t := &Throttler{
name: name,
Expand All @@ -98,6 +100,7 @@ func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Thrott
flushLimiter: quotapool.NewRateLimiter(
fmt.Sprintf("%s-flushes", name), 0, 0, logSlowAcquisition,
),
metrics: m,
}
t.updateConfig(config)
return t
Expand All @@ -109,7 +112,7 @@ var nodeSinkThrottle = struct {
}{}

// NodeLevelThrottler returns node level Throttler for changefeeds.
func NodeLevelThrottler(sv *settings.Values) *Throttler {
func NodeLevelThrottler(sv *settings.Values, metrics *Metrics) *Throttler {
getConfig := func() (config changefeedbase.SinkThrottleConfig) {
configStr := changefeedbase.NodeSinkThrottleConfig.Get(sv)
if configStr != "" {
Expand All @@ -126,7 +129,7 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler {
if nodeSinkThrottle.Throttler != nil {
panic("unexpected state")
}
nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig())
nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig(), metrics)
// Update node throttler configs when settings change.
changefeedbase.NodeSinkThrottleConfig.SetOnChange(sv, func(ctx context.Context) {
nodeSinkThrottle.Throttler.updateConfig(getConfig())
Expand All @@ -135,3 +138,43 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler {

return nodeSinkThrottle.Throttler
}

// Metrics is a metric.Struct for kvfeed metrics.
type Metrics struct {
BytesPushbackNanos *metric.Counter
MessagesPushbackNanos *metric.Counter
FlushPushbackNanos *metric.Counter
}

// MakeMetrics constructs a Metrics struct with the provided histogram window.
func MakeMetrics(histogramWindow time.Duration) Metrics {
makeMetric := func(n string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("changefeed.%s.messages_pushback_nanos", n),
Help: fmt.Sprintf("Total time spent throttled for %s quota", n),
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
}

return Metrics{
BytesPushbackNanos: metric.NewCounter(makeMetric("bytes")),
MessagesPushbackNanos: metric.NewCounter(makeMetric("messages")),
FlushPushbackNanos: metric.NewCounter(makeMetric("flush")),
}
}

var _ metric.Struct = (*Metrics)(nil)

// MetricStruct makes Metrics a metric.Struct.
func (m Metrics) MetricStruct() {}

func waitQuota(
ctx context.Context, n int64, limit *quotapool.RateLimiter, c *metric.Counter,
) error {
start := timeutil.Now()
defer func() {
c.Inc(int64(timeutil.Now().Sub(start)))
}()
return limit.WaitN(ctx, n)
}
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/cdcutils/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package cdcutils
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -24,8 +25,8 @@ func TestNodeLevelThrottler(t *testing.T) {
defer log.Scope(t).Close(t)

sv := &cluster.MakeTestingClusterSettings().SV

throttler := NodeLevelThrottler(sv)
m := MakeMetrics(time.Minute)
throttler := NodeLevelThrottler(sv, &m)

// Default: no throttling
require.True(t, throttler.messageLimiter.AdmitN(10000000))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (ca *changeAggregator) startKVFeed(
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics),
cdcutils.NodeLevelThrottler(&cfg.Settings.SV))
cdcutils.NodeLevelThrottler(&cfg.Settings.SV, &ca.metrics.ThrottleMetrics))

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -233,6 +234,7 @@ type Metrics struct {
Running *metric.Gauge

FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -270,7 +272,9 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {

Running: metric.NewGauge(metaChangefeedRunning),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
}

m.mu.resolved = make(map[int]hlc.Timestamp)
m.mu.id = 1 // start the first id at 1 so we can detect initialization
m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {
Expand Down

0 comments on commit f2f824c

Please sign in to comment.