Skip to content

Commit

Permalink
changefeedccl: Add ability to track per-changefeed metrics.
Browse files Browse the repository at this point in the history
Changefeeds can now be created with a new option: `metrics_label`.

Metrics label allow the operator to group metrics related to
service level indicators (SLI) by attaching a label to those metrics.
Examples of SLI related metrics are:
  * Error rates
  * Number of emitted messages and the size of data (pre & post compression).
  * Admit and commit message latency

This experimental feature is controlled via an environment variable
`COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS` which can be set to
true to enable this feature.

While the admistrator is free to create as many of those scoped labels
as they need (up to a hard limit of 1024), excessive number of labels should
be avoided since each custom label increases the cardinality of the metrics
exported to prometheus.

SLI scopes let operators label one or more changefeeds, based on criteria
of their choosing.  This labeling effectively creates a monitoring scope,
based on relative priority of those changefeeds, which lets the operator
monitor those changefeed independently of other changefeeds
For example, production changefeed might fall under `prod` scope,
while perhaps less important, changefeeds might fall under `low_pri` scope.

The use of `metrics_label` is supported by enterprise changefeeds only.

Release Notes (enterprise change): Changefeeds can be created with a new
option called `metrics_label` which lets operators configure changefeeds to use
dedicated set of metrics for those changefeed(s) so that they can be monitored
independently of other changefeed(s) in the system.
  • Loading branch information
Yevgeniy Miretskiy committed Nov 19, 2021
1 parent 35e3995 commit 707af75
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 308 deletions.
5 changes: 2 additions & 3 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
"doc.go",
"encoder.go",
"metrics.go",
"metrics_scope.go",
"name.go",
"rowfetcher_cache.go",
"schema_registry.go",
Expand Down Expand Up @@ -84,6 +83,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/duration",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/errorutil",
"//pkg/util/hlc",
"//pkg/util/httputil",
Expand All @@ -92,6 +92,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/retry",
Expand Down Expand Up @@ -123,7 +124,6 @@ go_test(
"helpers_tenant_shim_test.go",
"helpers_test.go",
"main_test.go",
"metrics_scope_test.go",
"name_test.go",
"nemeses_test.go",
"schema_registry_test.go",
Expand Down Expand Up @@ -202,7 +202,6 @@ go_test(
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
Expand Down
50 changes: 25 additions & 25 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ type changeAggregator struct {
// boundary information.
frontier *schemaChangeFrontier

metrics *Metrics
knobs TestingKnobs
metrics *Metrics
sliMetrics *sliMetrics
knobs TestingKnobs
}

type timestampLowerBoundOracle interface {
Expand Down Expand Up @@ -245,9 +246,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
// runs. They're all stored as the `metric.Struct` interface because of
// dependency cycles.
ca.metrics = ca.flowCtx.Cfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics)
sm := &sinkMetrics{SinkMetrics: ca.metrics.SinkMetrics}
ca.sliMetrics, err = ca.metrics.getSLIMetrics(ca.spec.Feed.Opts[changefeedbase.OptMetricsScope])
if err != nil {
ca.MoveToDraining(err)
ca.cancel()
return
}

ca.sink, err = getSink(ctx, ca.flowCtx.Cfg, ca.spec.Feed, timestampOracle,
ca.spec.User(), ca.spec.JobID, sm)
ca.spec.User(), ca.spec.JobID, ca.sliMetrics)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand All @@ -265,7 +272,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {

ca.sink = &errorWrapperSink{wrapped: ca.sink}

ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan, sm)
ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan, ca.sliMetrics)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
Expand All @@ -287,7 +294,7 @@ func (ca *changeAggregator) startKVFeed(
spans []roachpb.Span,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
sm *sinkMetrics,
sm *sliMetrics,
) (kvevent.Reader, error) {
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
Expand Down Expand Up @@ -327,7 +334,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
buf kvevent.Writer,
initialHighWater hlc.Timestamp,
needsInitialScan bool,
sm *sinkMetrics,
sm *sliMetrics,
) kvfeed.Config {
schemaChangeEvents := changefeedbase.SchemaChangeEventClass(
ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
Expand All @@ -344,16 +351,6 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

onBackfillCb := func(backfillTS hlc.Timestamp) {
if backfillTS.IsEmpty() {
sm.backfilling.Set(false)
ca.metrics.BackfillCount.Dec(1)
} else {
sm.backfilling.Set(true)
ca.metrics.BackfillCount.Inc(1)
}
}

return kvfeed.Config{
Writer: buf,
Settings: cfg.Settings,
Expand All @@ -365,7 +362,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: onBackfillCb,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
MM: ca.kvFeedMemMon,
InitialHighWater: initialHighWater,
WithDiff: withDiff,
Expand Down Expand Up @@ -518,13 +515,12 @@ func (ca *changeAggregator) tick() error {
queuedNanos := timeutil.Since(event.BufferAddTimestamp()).Nanoseconds()
ca.metrics.QueueTimeNanos.Inc(queuedNanos)

// Keep track of SLI latency for non-backfill/rangefeed KV events.
if event.Type() == kvevent.TypeKV && event.BackfillTimestamp().IsEmpty() {
ca.metrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}

switch event.Type() {
case kvevent.TypeKV:
// Keep track of SLI latency for non-backfill/rangefeed KV events.
if event.BackfillTimestamp().IsEmpty() {
ca.sliMetrics.AdmitLatency.RecordValue(timeutil.Since(event.Timestamp().GoTime()).Nanoseconds())
}
return ca.eventConsumer.ConsumeEvent(ca.Ctx, event)
case kvevent.TypeResolved:
a := event.DetachAlloc()
Expand Down Expand Up @@ -1108,9 +1104,13 @@ func (cf *changeFrontier) Start(ctx context.Context) {
// but the oracle is only used when emitting row updates.
var nilOracle timestampLowerBoundOracle
var err error
sm := &sinkMetrics{SinkMetrics: cf.metrics.SinkMetrics}
sli, err := cf.metrics.getSLIMetrics(cf.spec.Feed.Opts[changefeedbase.OptMetricsScope])
if err != nil {
cf.MoveToDraining(err)
return
}
cf.sink, err = getSink(ctx, cf.flowCtx.Cfg, cf.spec.Feed, nilOracle,
cf.spec.User(), cf.spec.JobID, sm)
cf.spec.User(), cf.spec.JobID, sli)

if err != nil {
err = changefeedbase.MarkRetryableError(err)
Expand Down
40 changes: 32 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,25 @@ func changefeedPlanHook(
telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat])
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
); err != nil {
return errors.Wrapf(err,
"use of %q option requires enterprise license.", changefeedbase.OptMetricsScope)
}

if scope == defaultSLIScope {
return pgerror.Newf(pgcode.InvalidParameterValue,
"%[1]q=%[2]q is the default metrics scope which keeps track of statistics "+
"across all changefeeds without explicit label. "+
"If this is an intended behavior, please re-run the statement "+
"without specifying %[1]q parameter. "+
"Otherwise, please re-run with a different %[1]q value.",
changefeedbase.OptMetricsScope, defaultSLIScope)
}
}

if details.SinkURI == `` {
telemetry.Count(`changefeed.create.core`)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
Expand All @@ -356,20 +375,21 @@ func changefeedPlanHook(

telemetry.Count(`changefeed.create.enterprise`)

metrics := p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics)
sli, err := metrics.getSLIMetrics(opts[changefeedbase.OptMetricsScope])
if err != nil {
return err
}

// In the case where a user is executing a CREATE CHANGEFEED and is still
// waiting for the statement to return, we take the opportunity to ensure
// that the user has not made any obvious errors when specifying the sink in
// the CREATE CHANGEFEED statement. To do this, we create a "canary" sink,
// which will be immediately closed, only to check for errors.
{
var nilOracle timestampLowerBoundOracle
sm := &sinkMetrics{
SinkMetrics: p.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).SinkMetrics,
}
canarySink, err := getSink(
ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details, nilOracle,
p.User(), jobspb.InvalidJobID, sm,
)
canarySink, err := getSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details,
nilOracle, p.User(), jobspb.InvalidJobID, sli)
if err != nil {
return changefeedbase.MaybeStripRetryableErrorMarker(err)
}
Expand Down Expand Up @@ -757,7 +777,11 @@ func (b *changefeedResumer) resumeWithRetries(
log.Warningf(ctx, `WARNING: CHANGEFEED job %d encountered retryable error: %v`, jobID, err)
b.setJobRunningStatus(ctx, "retryable error: %s", err)
if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.ErrorRetries.Inc(1)
sli, err := metrics.getSLIMetrics(details.Opts[changefeedbase.OptMetricsScope])
if err != nil {
return err
}
sli.ErrorRetries.Inc(1)
}
// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
Expand Down
19 changes: 15 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2404,8 +2404,17 @@ func TestChangefeedMonitoring(t *testing.T) {
t.Errorf(`expected 0 got %d`, c)
}

foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
_, _ = foo.Next()
enableSLIMetrics = false
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'`)
_, err := foo.Next()
require.Regexp(t, "cannot create metrics scope", err)
require.NoError(t, foo.Close())

enableSLIMetrics = true
foo = feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='tier0'`)
_, err = foo.Next()
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
if c := s.MustGetSQLCounter(`changefeed.emitted_messages`); c != 1 {
return errors.Errorf(`expected 1 got %d`, c)
Expand Down Expand Up @@ -2509,9 +2518,11 @@ func TestChangefeedRetryableError(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)
registry := f.Server().JobRegistry().(*jobs.Registry)

retryCounter := registry.MetricsStruct().Changefeed.(*Metrics).ErrorRetries
sli, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics(defaultSLIScope)
require.NoError(t, err)
retryCounter := sli.ErrorRetries
testutils.SucceedsSoon(t, func() error {
if retryCounter.Counter.Count() < 3 {
if retryCounter.Value() < 3 {
return fmt.Errorf("insufficient error retries detected")
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
OptWebhookAuthHeader = `webhook_auth_header`
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`

// OptSchemaChangeEventClassColumnChange corresponds to all schema change
// events which add or remove any column.
Expand Down Expand Up @@ -168,6 +169,7 @@ var ChangefeedOptionExpectValues = map[string]sql.KVStringOptValidate{
OptWebhookAuthHeader: sql.KVStringOptRequireValue,
OptWebhookClientTimeout: sql.KVStringOptRequireValue,
OptOnError: sql.KVStringOptRequireValue,
OptMetricsScope: sql.KVStringOptRequireValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
Expand All @@ -187,7 +189,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEnvelope,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan,
OptMinCheckpointFrequency)
OptMinCheckpointFrequency, OptMetricsScope)

// SQLValidOptions is options exclusive to SQL sink
var SQLValidOptions map[string]struct{} = nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Config struct {
Targets jobspb.ChangefeedTargets
Writer kvevent.Writer
Metrics *kvevent.Metrics
OnBackfillCallback func(timestamp hlc.Timestamp)
OnBackfillCallback func() func()
MM *mon.BytesMonitor
WithDiff bool
SchemaChangeEvents changefeedbase.SchemaChangeEventClass
Expand Down Expand Up @@ -157,7 +157,7 @@ type kvFeed struct {
writer kvevent.Writer
codec keys.SQLCodec

onBackfillCallback func(timestamp hlc.Timestamp)
onBackfillCallback func() func()
schemaChangeEvents changefeedbase.SchemaChangeEventClass
schemaChangePolicy changefeedbase.SchemaChangePolicy

Expand Down Expand Up @@ -269,6 +269,7 @@ func (f *kvFeed) scanIfShould(
ctx context.Context, initialScan bool, highWater hlc.Timestamp,
) error {
scanTime := highWater.Next()

events, err := f.tableFeed.Peek(ctx, scanTime)
if err != nil {
return err
Expand Down Expand Up @@ -328,8 +329,7 @@ func (f *kvFeed) scanIfShould(
}

if f.onBackfillCallback != nil {
f.onBackfillCallback(scanTime)
defer f.onBackfillCallback(hlc.Timestamp{})
defer f.onBackfillCallback()()
}

if err := f.scanner.Scan(ctx, f.writer, physicalConfig{
Expand Down
Loading

0 comments on commit 707af75

Please sign in to comment.