diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index bd741adb26bc..126c615ad653 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "doc.go", "encoder.go", "metrics.go", - "metrics_scope.go", "name.go", "rowfetcher_cache.go", "schema_registry.go", @@ -84,6 +83,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/duration", "//pkg/util/encoding", + "//pkg/util/envutil", "//pkg/util/errorutil", "//pkg/util/hlc", "//pkg/util/httputil", @@ -92,6 +92,7 @@ go_library( "//pkg/util/log", "//pkg/util/log/logcrash", "//pkg/util/metric", + "//pkg/util/metric/aggmetric", "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/retry", @@ -123,7 +124,6 @@ go_test( "helpers_tenant_shim_test.go", "helpers_test.go", "main_test.go", - "metrics_scope_test.go", "name_test.go", "nemeses_test.go", "schema_registry_test.go", @@ -202,7 +202,6 @@ go_test( "//pkg/util/json", "//pkg/util/leaktest", "//pkg/util/log", - "//pkg/util/metric", "//pkg/util/mon", "//pkg/util/protoutil", "//pkg/util/randutil", diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 1dec073fe4ab..d987ebc9d40b 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -90,8 +90,9 @@ type changeAggregator struct { // boundary information. frontier *schemaChangeFrontier - metrics *Metrics - knobs TestingKnobs + metrics *Metrics + sliMetrics *sliMetrics + knobs TestingKnobs } type timestampLowerBoundOracle interface { @@ -245,9 +246,15 @@ func (ca *changeAggregator) Start(ctx context.Context) { // runs. They're all stored as the `metric.Struct` interface because of // dependency cycles. ca.metrics = ca.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics) - sm := &sinkMetrics{SinkMetrics: ca.metrics.SinkMetrics} + ca.sliMetrics, err = ca.metrics.getSLIMetrics(ca.spec.Feed.Opts[changefeedbase.OptMetricsScope]) + if err != nil { + ca.MoveToDraining(err) + ca.cancel() + return + } + ca.sink, err = getSink(ctx, ca.flowCtx.Cfg, ca.spec.Feed, timestampOracle, - ca.spec.User(), ca.spec.JobID, sm) + ca.spec.User(), ca.spec.JobID, ca.sliMetrics) if err != nil { err = changefeedbase.MarkRetryableError(err) @@ -265,7 +272,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.sink = &errorWrapperSink{wrapped: ca.sink} - ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan, sm) + ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan, ca.sliMetrics) if err != nil { // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) @@ -287,7 +294,7 @@ func (ca *changeAggregator) startKVFeed( spans []roachpb.Span, initialHighWater hlc.Timestamp, needsInitialScan bool, - sm *sinkMetrics, + sm *sliMetrics, ) (kvevent.Reader, error) { cfg := ca.flowCtx.Cfg buf := kvevent.NewThrottlingBuffer( @@ -327,7 +334,7 @@ func (ca *changeAggregator) makeKVFeedCfg( buf kvevent.Writer, initialHighWater hlc.Timestamp, needsInitialScan bool, - sm *sinkMetrics, + sm *sliMetrics, ) kvfeed.Config { schemaChangeEvents := changefeedbase.SchemaChangeEventClass( ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) @@ -344,16 +351,6 @@ func (ca *changeAggregator) makeKVFeedCfg( initialHighWater, &ca.metrics.SchemaFeedMetrics) } - onBackfillCb := func(backfillTS hlc.Timestamp) { - if backfillTS.IsEmpty() { - sm.backfilling.Set(false) - ca.metrics.BackfillCount.Dec(1) - } else { - sm.backfilling.Set(true) - ca.metrics.BackfillCount.Inc(1) - } - } - return kvfeed.Config{ Writer: buf, Settings: cfg.Settings, @@ -365,7 +362,7 @@ func (ca *changeAggregator) makeKVFeedCfg( BackfillCheckpoint: ca.spec.Checkpoint.Spans, Targets: ca.spec.Feed.Targets, Metrics: &ca.metrics.KVFeedMetrics, - OnBackfillCallback: onBackfillCb, + OnBackfillCallback: ca.sliMetrics.getBackfillCallback(), MM: ca.kvFeedMemMon, InitialHighWater: initialHighWater, WithDiff: withDiff, @@ -518,13 +515,12 @@ func (ca *changeAggregator) tick() error { queuedNanos := timeutil.Since(event.BufferAddTimestamp()).Nanoseconds() ca.metrics.QueueTimeNanos.Inc(queuedNanos) - // Keep track of SLI latency for non-backfill/rangefeed KV events. - if event.Type() == kvevent.TypeKV && event.BackfillTimestamp().IsEmpty() { - ca.metrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) - } - switch event.Type() { case kvevent.TypeKV: + // Keep track of SLI latency for non-backfill/rangefeed KV events. + if event.BackfillTimestamp().IsEmpty() { + ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds()) + } return ca.eventConsumer.ConsumeEvent(ca.Ctx, event) case kvevent.TypeResolved: a := event.DetachAlloc() @@ -1108,9 +1104,13 @@ func (cf *changeFrontier) Start(ctx context.Context) { // but the oracle is only used when emitting row updates. var nilOracle timestampLowerBoundOracle var err error - sm := &sinkMetrics{SinkMetrics: cf.metrics.SinkMetrics} + sli, err := cf.metrics.getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope]) + if err != nil { + cf.MoveToDraining(err) + return + } cf.sink, err = getSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle, - cf.spec.User(), cf.spec.JobID, sm) + cf.spec.User(), cf.spec.JobID, sli) if err != nil { err = changefeedbase.MarkRetryableError(err) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 301558469215..92325bca46b1 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -339,6 +339,25 @@ func changefeedPlanHook( telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat]) telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets))) + if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { + if err := utilccl.CheckEnterpriseEnabled( + p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", + ); err != nil { + return errors.Wrapf(err, + "use of %q option requires enterprise license.", changefeedbase.OptMetricsScope) + } + + if scope == defaultSLIScope { + return pgerror.Newf(pgcode.InvalidParameterValue, + "%[1]q=%[2]q is the default metrics scope which keeps track of statistics "+ + "across all changefeeds without explicit label. "+ + "If this is an intended behavior, please re-run the statement "+ + "without specifying %[1]q parameter. "+ + "Otherwise, please re-run with a different %[1]q value.", + changefeedbase.OptMetricsScope, defaultSLIScope) + } + } + if details.SinkURI == `` { telemetry.Count(`changefeed.create.core`) err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh) @@ -356,6 +375,12 @@ func changefeedPlanHook( telemetry.Count(`changefeed.create.enterprise`) + metrics := p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics) + sli, err := metrics.getSLIMetrics(opts[changefeedbase.OptMetricsScope]) + if err != nil { + return err + } + // In the case where a user is executing a CREATE CHANGEFEED and is still // waiting for the statement to return, we take the opportunity to ensure // that the user has not made any obvious errors when specifying the sink in @@ -363,13 +388,8 @@ func changefeedPlanHook( // which will be immediately closed, only to check for errors. { var nilOracle timestampLowerBoundOracle - sm := &sinkMetrics{ - SinkMetrics: p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).SinkMetrics, - } - canarySink, err := getSink( - ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle, - p.User(), jobspb.InvalidJobID, sm, - ) + canarySink, err := getSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, + nilOracle, p.User(), jobspb.InvalidJobID, sli) if err != nil { return changefeedbase.MaybeStripRetryableErrorMarker(err) } @@ -757,7 +777,11 @@ func (b *changefeedResumer) resumeWithRetries( log.Warningf(ctx, `WARNING: CHANGEFEED job %d encountered retryable error: %v`, jobID, err) b.setJobRunningStatus(ctx, "retryable error: %s", err) if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok { - metrics.ErrorRetries.Inc(1) + sli, err := metrics.getSLIMetrics(details.Opts[changefeedbase.OptMetricsScope]) + if err != nil { + return err + } + sli.ErrorRetries.Inc(1) } // Re-load the job in order to update our progress object, which may have // been updated by the changeFrontier processor since the flow started. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 261647d5bfbf..99cc4b4debf6 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2404,8 +2404,17 @@ func TestChangefeedMonitoring(t *testing.T) { t.Errorf(`expected 0 got %d`, c) } - foo := feed(t, f, `CREATE CHANGEFEED FOR foo`) - _, _ = foo.Next() + enableSLIMetrics = false + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'`) + _, err := foo.Next() + require.Regexp(t, "cannot create metrics scope", err) + require.NoError(t, foo.Close()) + + enableSLIMetrics = true + foo = feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'`) + _, err = foo.Next() + require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { if c := s.MustGetSQLCounter(`changefeed.emitted_messages`); c != 1 { return errors.Errorf(`expected 1 got %d`, c) @@ -2509,9 +2518,11 @@ func TestChangefeedRetryableError(t *testing.T) { sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) registry := f.Server().JobRegistry().(*jobs.Registry) - retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries + sli, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(defaultSLIScope) + require.NoError(t, err) + retryCounter := sli.ErrorRetries testutils.SucceedsSoon(t, func() error { - if retryCounter.Counter.Count() < 3 { + if retryCounter.Value() < 3 { return fmt.Errorf("insufficient error retries detected") } return nil diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index ade240119271..8d986f41eaff 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -49,6 +49,7 @@ const ( OptWebhookAuthHeader = `webhook_auth_header` OptWebhookClientTimeout = `webhook_client_timeout` OptOnError = `on_error` + OptMetricsScope = `metrics_label` // OptSchemaChangeEventClassColumnChange corresponds to all schema change // events which add or remove any column. @@ -168,6 +169,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{ OptWebhookAuthHeader: sql.KVStringOptRequireValue, OptWebhookClientTimeout: sql.KVStringOptRequireValue, OptOnError: sql.KVStringOptRequireValue, + OptMetricsScope: sql.KVStringOptRequireValue, } func makeStringSet(opts ...string) map[string]struct{} { @@ -187,7 +189,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, OptInitialScan, OptNoInitialScan, - OptMinCheckpointFrequency) + OptMinCheckpointFrequency, OptMetricsScope) // SQLValidOptions is options exclusive to SQL sink var SQLValidOptions map[string]struct{} = nil diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 19d074b7f876..20e8172c1ce9 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -46,7 +46,7 @@ type Config struct { Targets jobspb.ChangefeedTargets Writer kvevent.Writer Metrics *kvevent.Metrics - OnBackfillCallback func(timestamp hlc.Timestamp) + OnBackfillCallback func() func() MM *mon.BytesMonitor WithDiff bool SchemaChangeEvents changefeedbase.SchemaChangeEventClass @@ -157,7 +157,7 @@ type kvFeed struct { writer kvevent.Writer codec keys.SQLCodec - onBackfillCallback func(timestamp hlc.Timestamp) + onBackfillCallback func() func() schemaChangeEvents changefeedbase.SchemaChangeEventClass schemaChangePolicy changefeedbase.SchemaChangePolicy @@ -269,6 +269,7 @@ func (f *kvFeed) scanIfShould( ctx context.Context, initialScan bool, highWater hlc.Timestamp, ) error { scanTime := highWater.Next() + events, err := f.tableFeed.Peek(ctx, scanTime) if err != nil { return err @@ -328,8 +329,7 @@ func (f *kvFeed) scanIfShould( } if f.onBackfillCallback != nil { - f.onBackfillCallback(scanTime) - defer f.onBackfillCallback(hlc.Timestamp{}) + defer f.onBackfillCallback()() } if err := f.scanner.Scan(ctx, f.writer, physicalConfig{ diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 6fc37047ad58..d7854ea44df9 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -9,46 +9,80 @@ package changefeedccl import ( + "strings" "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" ) -// SinkMetrics are sink specific metrics. -type SinkMetrics struct { - EmittedMessages *metric.Counter - EmittedBytes *metric.Counter - FlushedBytes *metric.Counter - BatchHistNanos *metric.Histogram - Flushes *metric.Counter - FlushHistNanos *metric.Histogram - CommitLatency *metric.Histogram +// allow creation of per changefeed SLI metrics. +var enableSLIMetrics = envutil.EnvOrDefaultBool( + "COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS", false) + +// max length for the scope name. +const maxSLIScopeNameLen = 128 + +// defaultSLIScope is the name of the default SLI scope -- i.e. the set of metrics +// keeping track of all changefeeds which did not have explicit sli scope specified. +const defaultSLIScope = "default" + +// AggMetrics are aggregated metrics keeping track of aggregated changefeed performance +// indicators, combined with a limited number of per-changefeed indicators. +type AggMetrics struct { + EmittedMessages *aggmetric.AggCounter + EmittedBytes *aggmetric.AggCounter + FlushedBytes *aggmetric.AggCounter + BatchHistNanos *aggmetric.AggHistogram + Flushes *aggmetric.AggCounter + FlushHistNanos *aggmetric.AggHistogram + CommitLatency *aggmetric.AggHistogram + BackfillCount *aggmetric.AggGauge + ErrorRetries *aggmetric.AggCounter + AdmitLatency *aggmetric.AggHistogram + + // There is always at least 1 sliMetrics created for defaultSLI scope. + mu struct { + syncutil.Mutex + sliMetrics map[string]*sliMetrics + } } // MetricStruct implements metric.Struct interface. -func (m *SinkMetrics) MetricStruct() {} - -// sinkMetrics annotates global SinkMetrics with per-changefeed information, -// such as the state of backfill. -type sinkMetrics struct { - *SinkMetrics - backfilling syncutil.AtomicBool +func (a *AggMetrics) MetricStruct() {} + +// sliMetrics holds all SLI related metrics aggregated into AggMetrics. +type sliMetrics struct { + EmittedMessages *aggmetric.Counter + EmittedBytes *aggmetric.Counter + FlushedBytes *aggmetric.Counter + BatchHistNanos *aggmetric.Histogram + Flushes *aggmetric.Counter + FlushHistNanos *aggmetric.Histogram + CommitLatency *aggmetric.Histogram + ErrorRetries *aggmetric.Counter + AdmitLatency *aggmetric.Histogram + BackfillCount *aggmetric.Gauge } -// sinkDoesNotCompress is a sentinel value indicating the the sink +// sinkDoesNotCompress is a sentinel value indicating the sink // does not compress the data it emits. const sinkDoesNotCompress = -1 type recordEmittedMessagesCallback func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) -func (m *sinkMetrics) recordEmittedMessages() recordEmittedMessagesCallback { +func (m *sliMetrics) recordEmittedMessages() recordEmittedMessagesCallback { if m == nil { return func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) {} } @@ -59,7 +93,7 @@ func (m *sinkMetrics) recordEmittedMessages() recordEmittedMessagesCallback { } } -func (m *sinkMetrics) recordEmittedBatch( +func (m *sliMetrics) recordEmittedBatch( startTime time.Time, numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int, ) { if m == nil { @@ -73,12 +107,12 @@ func (m *sinkMetrics) recordEmittedBatch( } m.FlushedBytes.Inc(int64(compressedBytes)) m.BatchHistNanos.RecordValue(emitNanos) - if !m.backfilling.Get() { + if m.BackfillCount.Value() == 0 { m.CommitLatency.RecordValue(timeutil.Since(mvcc.GoTime()).Nanoseconds()) } } -func (m *sinkMetrics) recordResolvedCallback() func() { +func (m *sliMetrics) recordResolvedCallback() func() { if m == nil { return func() {} } @@ -91,7 +125,7 @@ func (m *sinkMetrics) recordResolvedCallback() func() { } } -func (m *SinkMetrics) recordFlushRequestCallback() func() { +func (m *sliMetrics) recordFlushRequestCallback() func() { if m == nil { return func() {} } @@ -104,45 +138,30 @@ func (m *SinkMetrics) recordFlushRequestCallback() func() { } } +func (m *sliMetrics) getBackfillCallback() func() func() { + return func() func() { + m.BackfillCount.Inc(1) + return func() { + m.BackfillCount.Dec(1) + } + } +} + const ( changefeedCheckpointHistMaxLatency = 30 * time.Second - changefeedEmitHistMaxLatency = 30 * time.Second + changefeedBatchHistMaxLatency = 30 * time.Second changefeedFlushHistMaxLatency = 1 * time.Minute - admitLatencyMaxValue = 60 * time.Second - commitLatencyMaxValue = 10 * 60 * time.Second + admitLatencyMaxValue = 1 * time.Minute + commitLatencyMaxValue = 10 * time.Minute ) var ( - metaChangefeedEmittedMessages = metric.Metadata{ - Name: "changefeed.emitted_messages", - Help: "Messages emitted by all feeds", - Measurement: "Messages", - Unit: metric.Unit_COUNT, - } metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", Help: "Resolved timestamps forwarded from the change aggregator to the change frontier", Measurement: "Messages", Unit: metric.Unit_COUNT, } - metaChangefeedEmittedBytes = metric.Metadata{ - Name: "changefeed.emitted_bytes", - Help: "Bytes emitted by all feeds", - Measurement: "Bytes", - Unit: metric.Unit_BYTES, - } - metaChangefeedFlushedBytes = metric.Metadata{ - Name: "changefeed.flushed_bytes", - Help: "Bytes emitted by all feeds; maybe different from changefeed.emitted_bytes when compression is enabled", - Measurement: "Bytes", - Unit: metric.Unit_BYTES, - } - metaChangefeedFlushes = metric.Metadata{ - Name: "changefeed.flushes", - Help: "Total flushes across all feeds", - Measurement: "Flushes", - Unit: metric.Unit_COUNT, - } metaChangefeedErrorRetries = metric.Metadata{ Name: "changefeed.error_retries", Help: "Total retryable errors encountered by all changefeeds", @@ -162,25 +181,7 @@ var ( Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } - metaAdmitLatency = metric.Metadata{ - Name: "changefeed.admit_latency", - Help: "Event admission latency: a difference between event MVCC timestamp " + - "and the time it was admitted into changefeed pipeline; " + - "Note: this metric includes the time spent waiting until event can be processed due " + - "to backpressure or time spent resolving schema descriptors. " + - "Also note, this metric excludes latency during backfill", - Measurement: "Nanoseconds", - Unit: metric.Unit_NANOSECONDS, - } - metaCommitLatency = metric.Metadata{ - Name: "changefeed.commit_latency", - Help: "Event commit latency: a difference between event MVCC timestamp " + - "and the time it was acknowledged by the downstream sink. If the sink batches events, " + - " then the difference between the oldest event in the batch and acknowledgement is recorded; " + - "Excludes latency during backfill", - Measurement: "Nanoseconds", - Unit: metric.Unit_NANOSECONDS, - } + metaChangefeedRunning = metric.Metadata{ Name: "changefeed.running", Help: "Number of currently running changefeeds, including sinkless", @@ -195,20 +196,6 @@ var ( Unit: metric.Unit_NANOSECONDS, } - metaChangefeedBatchHistNanos = metric.Metadata{ - Name: "changefeed.sink_batch_hist_nanos", - Help: "Time spent batched in the sink buffer before being being flushed and acknowledged", - Measurement: "Changefeeds", - Unit: metric.Unit_NANOSECONDS, - } - - metaChangefeedFlushHistNanos = metric.Metadata{ - Name: "changefeed.flush_hist_nanos", - Help: "Time spent flushing messages across all changefeeds", - Measurement: "Changefeeds", - Unit: metric.Unit_NANOSECONDS, - } - // TODO(dan): This was intended to be a measure of the minimum distance of // any changefeed ahead of its gc ttl threshold, but keeping that correct in // the face of changing zone configs is much harder, so this will have to do @@ -226,32 +213,162 @@ var ( Measurement: "Updates", Unit: metric.Unit_COUNT, } - metaChangefeedBackfillCount = metric.Metadata{ +) + +func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { + metaChangefeedEmittedMessages := metric.Metadata{ + Name: "changefeed.emitted_messages", + Help: "Messages emitted by all feeds", + Measurement: "Messages", + Unit: metric.Unit_COUNT, + } + metaChangefeedEmittedBytes := metric.Metadata{ + Name: "changefeed.emitted_bytes", + Help: "Bytes emitted by all feeds", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaChangefeedFlushedBytes := metric.Metadata{ + Name: "changefeed.flushed_bytes", + Help: "Bytes emitted by all feeds; maybe different from changefeed.emitted_bytes when compression is enabled", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaChangefeedFlushes := metric.Metadata{ + Name: "changefeed.flushes", + Help: "Total flushes across all feeds", + Measurement: "Flushes", + Unit: metric.Unit_COUNT, + } + metaChangefeedBatchHistNanos := metric.Metadata{ + Name: "changefeed.sink_batch_hist_nanos", + Help: "Time spent batched in the sink buffer before being flushed and acknowledged", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedFlushHistNanos := metric.Metadata{ + Name: "changefeed.flush_hist_nanos", + Help: "Time spent flushing messages across all changefeeds", + Measurement: "Changefeeds", + Unit: metric.Unit_NANOSECONDS, + } + metaCommitLatency := metric.Metadata{ + Name: "changefeed.commit_latency", + Help: "Event commit latency: a difference between event MVCC timestamp " + + "and the time it was acknowledged by the downstream sink. If the sink batches events, " + + " then the difference between the oldest event in the batch and acknowledgement is recorded; " + + "Excludes latency during backfill", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaAdmitLatency := metric.Metadata{ + Name: "changefeed.admit_latency", + Help: "Event admission latency: a difference between event MVCC timestamp " + + "and the time it was admitted into changefeed pipeline; " + + "Note: this metric includes the time spent waiting until event can be processed due " + + "to backpressure or time spent resolving schema descriptors. " + + "Also note, this metric excludes latency during backfill", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaChangefeedBackfillCount := metric.Metadata{ Name: "changefeed.backfill_count", Help: "Number of changefeeds currently executing backfill", Measurement: "Count", Unit: metric.Unit_COUNT, } -) -// Metrics are for production monitoring of changefeeds. -type Metrics struct { - *SinkMetrics - KVFeedMetrics kvevent.Metrics - SchemaFeedMetrics schemafeed.Metrics + // NB: When adding new histograms, use sigFigs = 1. Older histograms + // retain significant figures of 2. + b := aggmetric.MakeBuilder("scope") + a := &AggMetrics{ + ErrorRetries: b.Counter(metaChangefeedErrorRetries), + EmittedMessages: b.Counter(metaChangefeedEmittedMessages), + EmittedBytes: b.Counter(metaChangefeedEmittedBytes), + FlushedBytes: b.Counter(metaChangefeedFlushedBytes), + Flushes: b.Counter(metaChangefeedFlushes), + + BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos, + histogramWindow, changefeedBatchHistMaxLatency.Nanoseconds(), 1), + FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos, + histogramWindow, changefeedFlushHistMaxLatency.Nanoseconds(), 2), + CommitLatency: b.Histogram(metaCommitLatency, + histogramWindow, commitLatencyMaxValue.Nanoseconds(), 1), + AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow, + admitLatencyMaxValue.Nanoseconds(), 1), + BackfillCount: b.Gauge(metaChangefeedBackfillCount), + } + a.mu.sliMetrics = make(map[string]*sliMetrics) + _, err := a.getOrCreateScope(defaultSLIScope) + if err != nil { + // defaultSLIScope must always exist. + panic(err) + } + return a +} + +func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { + a.mu.Lock() + defer a.mu.Unlock() + + scope = strings.TrimSpace(strings.ToLower(scope)) + + if scope == "" { + scope = defaultSLIScope + } + + if len(scope) > maxSLIScopeNameLen { + return nil, pgerror.Newf(pgcode.ConfigurationLimitExceeded, + "scope name length must be less than %d bytes", maxSLIScopeNameLen) + } + + if s, ok := a.mu.sliMetrics[scope]; ok { + return s, nil + } + + if scope != defaultSLIScope { + if !enableSLIMetrics { + return nil, errors.WithHint( + pgerror.Newf(pgcode.ConfigurationLimitExceeded, "cannot create metrics scope %q", scope), + "try restarting with COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS=true", + ) + } + const failSafeMax = 1024 + if len(a.mu.sliMetrics) == failSafeMax { + return nil, pgerror.Newf(pgcode.ConfigurationLimitExceeded, + "too many metrics labels; max %d", failSafeMax) + } + } - ErrorRetries *metric.Counter - Failures *metric.Counter - ResolvedMessages *metric.Counter - BackfillCount *metric.Gauge + sm := &sliMetrics{ + EmittedMessages: a.EmittedMessages.AddChild(scope), + EmittedBytes: a.EmittedBytes.AddChild(scope), + FlushedBytes: a.FlushedBytes.AddChild(scope), + BatchHistNanos: a.BatchHistNanos.AddChild(scope), + Flushes: a.Flushes.AddChild(scope), + FlushHistNanos: a.FlushHistNanos.AddChild(scope), + CommitLatency: a.CommitLatency.AddChild(scope), + ErrorRetries: a.ErrorRetries.AddChild(scope), + AdmitLatency: a.AdmitLatency.AddChild(scope), + BackfillCount: a.BackfillCount.AddChild(scope), + } + a.mu.sliMetrics[scope] = sm + return sm, nil +} + +// Metrics are for production monitoring of changefeeds. +type Metrics struct { + AggMetrics *AggMetrics + KVFeedMetrics kvevent.Metrics + SchemaFeedMetrics schemafeed.Metrics + Failures *metric.Counter + ResolvedMessages *metric.Counter QueueTimeNanos *metric.Counter - AdmitLatency *metric.Histogram CheckpointHistNanos *metric.Histogram Running *metric.Gauge - - FrontierUpdates *metric.Counter - ThrottleMetrics cdcutils.Metrics + FrontierUpdates *metric.Counter + ThrottleMetrics cdcutils.Metrics mu struct { syncutil.Mutex @@ -264,37 +381,22 @@ type Metrics struct { // MetricStruct implements the metric.Struct interface. func (*Metrics) MetricStruct() {} +// getSLIMetrics retursn SLIMeterics associated with the specified scope. +func (m *Metrics) getSLIMetrics(scope string) (*sliMetrics, error) { + return m.AggMetrics.getOrCreateScope(scope) +} + // MakeMetrics makes the metrics for changefeed monitoring. func MakeMetrics(histogramWindow time.Duration) metric.Struct { m := &Metrics{ - SinkMetrics: &SinkMetrics{ - EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages), - EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes), - FlushedBytes: metric.NewCounter(metaChangefeedFlushedBytes), - Flushes: metric.NewCounter(metaChangefeedFlushes), - - BatchHistNanos: metric.NewHistogram(metaChangefeedBatchHistNanos, histogramWindow, - changefeedEmitHistMaxLatency.Nanoseconds(), 1), - FlushHistNanos: metric.NewHistogram(metaChangefeedFlushHistNanos, histogramWindow, - changefeedFlushHistMaxLatency.Nanoseconds(), 1), - - CommitLatency: metric.NewHistogram(metaCommitLatency, histogramWindow, - commitLatencyMaxValue.Nanoseconds(), 1), - }, - + AggMetrics: newAggregateMetrics(histogramWindow), KVFeedMetrics: kvevent.MakeMetrics(histogramWindow), SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow), - ErrorRetries: metric.NewCounter(metaChangefeedErrorRetries), ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages), Failures: metric.NewCounter(metaChangefeedFailures), QueueTimeNanos: metric.NewCounter(metaEventQueueTime), - AdmitLatency: metric.NewHistogram(metaAdmitLatency, histogramWindow, - admitLatencyMaxValue.Nanoseconds(), 1), - CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, changefeedCheckpointHistMaxLatency.Nanoseconds(), 2), - - BackfillCount: metric.NewGauge(metaChangefeedBackfillCount), Running: metric.NewGauge(metaChangefeedRunning), FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow), diff --git a/pkg/ccl/changefeedccl/metrics_scope.go b/pkg/ccl/changefeedccl/metrics_scope.go deleted file mode 100644 index 3feeaf2db71a..000000000000 --- a/pkg/ccl/changefeedccl/metrics_scope.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package changefeedccl - -import ( - "fmt" - - "github.com/cockroachdb/cockroach/pkg/util/metric" -) - -// maxSLIScopes is a static limit on the number of SLI scopes -- that is the number -// of SLI metrics we will keep track of. -// The limit is static due to metric.Registry limitations. -const maxSLIScopes = 8 - -// SLIMetrics is the list of SLI related metrics for changefeeds. -type SLIMetrics struct { - ErrorRetries *metric.Counter - // TODO(yevgeniy): Add more SLI related metrics. -} - -// MetricStruct implements metric.Struct interface -func (*SLIMetrics) MetricStruct() {} - -func makeSLIMetrics(prefix string) *SLIMetrics { - withPrefix := func(meta metric.Metadata) metric.Metadata { - meta.Name = fmt.Sprintf("%s.%s", prefix, meta.Name) - return meta - } - - return &SLIMetrics{ - ErrorRetries: metric.NewCounter(withPrefix(metric.Metadata{ - Name: "error_retries", - Help: "Total retryable errors encountered this SLI", - Measurement: "Errors", - Unit: metric.Unit_COUNT, - })), - } -} - -// SLIScopes represents a set of SLI related metrics for a particular "scope". -type SLIScopes struct { - Scopes [maxSLIScopes]*SLIMetrics // Exported so that we can register w/ metrics registry. - names map[string]*SLIMetrics -} - -// MetricStruct implements metric.Struct interface -func (*SLIScopes) MetricStruct() {} - -// CreateSLIScopes creates changefeed specific SLI scope: a metric.Struct containing -// SLI specific metrics for each scope. -// The scopes are statically named "tier", and each metric name -// contained in SLIMetrics will be prefixed by "changefeed.tier