From c1fde26730b4fc54e4bbc724d1b29f653541f720 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 1 Nov 2024 11:11:58 -0600 Subject: [PATCH] feat: move metric aggregation to a per-tenant config (#14709) --- docs/sources/shared/configuration.md | 11 +++++--- pkg/loki/modules.go | 1 + pkg/pattern/aggregation/config.go | 12 +++----- pkg/pattern/ingester.go | 42 +++++++++++----------------- pkg/pattern/ingester_test.go | 5 ++++ pkg/pattern/tee_service.go | 12 +++++--- pkg/pattern/tee_service_test.go | 3 ++ pkg/validation/limits.go | 16 +++++++++-- pkg/validation/limits_test.go | 33 ++++++++++++++++++++++ 9 files changed, 92 insertions(+), 43 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 59ae5d71200ab..90d54d30c2989 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -359,10 +359,6 @@ pattern_ingester: # Configures the metric aggregation and storage behavior of the pattern # ingester. metric_aggregation: - # Whether the pattern ingester metric aggregation is enabled. - # CLI flag: -pattern-ingester.metric-aggregation.enabled - [enabled: | default = false] - # How often to downsample metrics from raw push observations. # CLI flag: -pattern-ingester.metric-aggregation.downsample-period [downsample_period: | default = 10s] @@ -3845,6 +3841,13 @@ otlp_config: # CLI flag: -limits.ingestion-partition-tenant-shard-size [ingestion_partitions_tenant_shard_size: | default = 0] +# Enable metric aggregation. When enabled, pushed streams will be sampled for +# bytes and count, and these metric will be written back into Loki as a special +# __aggregated_metric__ stream, which can be queried for faster histogram +# queries. +# CLI flag: -limits.metric-aggregation-enabled +[metric_aggregation_enabled: | default = false] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 8e5c76e8828de..1ebbe17118bf7 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -680,6 +680,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) { svc, err := pattern.NewTeeService( t.Cfg.Pattern, + t.Overrides, t.PatternRingClient, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, diff --git a/pkg/pattern/aggregation/config.go b/pkg/pattern/aggregation/config.go index b88eb8499ca73..c0c6dd6da6988 100644 --- a/pkg/pattern/aggregation/config.go +++ b/pkg/pattern/aggregation/config.go @@ -9,8 +9,6 @@ import ( ) type Config struct { - // TODO(twhitney): This needs to be a per-tenant config - Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` DownsamplePeriod time.Duration `yaml:"downsample_period"` LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."` WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."` @@ -27,12 +25,6 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { } func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) { - fs.BoolVar( - &cfg.Enabled, - prefix+"metric-aggregation.enabled", - false, - "Flag to enable or disable metric aggregation.", - ) fs.DurationVar( &cfg.DownsamplePeriod, prefix+"metric-aggregation.downsample-period", @@ -105,3 +97,7 @@ func (s *secretValue) Set(val string) error { func (s *secretValue) Get() any { return string(*s) } func (s *secretValue) String() string { return string(*s) } + +type Limits interface { + MetricAggregationEnabled(userID string) bool +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 60c71920b7d19..90e69f8433333 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -150,6 +150,7 @@ func (cfg *Config) Validate() error { type Limits interface { drain.Limits + aggregation.Limits } type Ingester struct { @@ -294,29 +295,18 @@ func (i *Ingester) loop() { flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) defer flushTicker.Stop() - if i.cfg.MetricAggregation.Enabled { - downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) - defer downsampleTicker.Stop() - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) - case t := <-downsampleTicker.C: - downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) - now := model.TimeFromUnixNano(t.UnixNano()) - i.downsampleMetrics(now) - case <-i.loopQuit: - return - } - } - } else { - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) - case <-i.loopQuit: - return - } + downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) + defer downsampleTicker.Stop() + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + case t := <-downsampleTicker.C: + downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) + now := model.TimeFromUnixNano(t.UnixNano()) + i.downsampleMetrics(now) + case <-i.loopQuit: + return } } } @@ -401,7 +391,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / var writer aggregation.EntryWriter aggCfg := i.cfg.MetricAggregation - if aggCfg.Enabled { + if i.limits.MetricAggregationEnabled(instanceID) { writer, err = aggregation.NewPush( aggCfg.LokiAddr, instanceID, @@ -469,6 +459,8 @@ func (i *Ingester) downsampleMetrics(ts model.Time) { instances := i.getInstances() for _, instance := range instances { - instance.Downsample(ts) + if i.limits.MetricAggregationEnabled(instance.instanceID) { + instance.Downsample(ts) + } } } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index effa1c1959437..0b1404fe544a8 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -341,8 +341,13 @@ func (m *mockEntryWriter) Stop() { type fakeLimits struct { Limits + metricAggregationEnabled bool } func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string { return []string{"log", "message", "msg", "msg_", "_msg", "content"} } + +func (f *fakeLimits) MetricAggregationEnabled(_ string) bool { + return f.metricAggregationEnabled +} diff --git a/pkg/pattern/tee_service.go b/pkg/pattern/tee_service.go index c279474cce42e..f94893ca6c91d 100644 --- a/pkg/pattern/tee_service.go +++ b/pkg/pattern/tee_service.go @@ -27,6 +27,7 @@ import ( type TeeService struct { cfg Config + limits Limits logger log.Logger ringClient RingClient wg *sync.WaitGroup @@ -48,6 +49,7 @@ type TeeService struct { func NewTeeService( cfg Config, + limits Limits, ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, @@ -83,6 +85,7 @@ func NewTeeService( ), ), cfg: cfg, + limits: limits, ringClient: ringClient, wg: &sync.WaitGroup{}, @@ -317,14 +320,15 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc() level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err) - if !ts.cfg.MetricAggregation.Enabled { - return err - } - // Pattern ingesters serve 2 functions, processing patterns and aggregating metrics. // Only owned streams are processed for patterns, however any pattern ingester can // aggregate metrics for any stream. Therefore, if we can't send the owned stream, // try to forward request to any pattern ingester so we at least capture the metrics. + + if !ts.limits.MetricAggregationEnabled(clientRequest.tenant) { + return err + } + replicationSet, err := ts.ringClient.Ring(). GetReplicationSetForOperation(ring.WriteNoExtend) if err != nil || len(replicationSet.Instances) == 0 { diff --git a/pkg/pattern/tee_service_test.go b/pkg/pattern/tee_service_test.go index 1be8114df0220..0fb8a032f062a 100644 --- a/pkg/pattern/tee_service_test.go +++ b/pkg/pattern/tee_service_test.go @@ -47,6 +47,9 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) { logsTee, err := NewTeeService( cfg, + &fakeLimits{ + metricAggregationEnabled: true, + }, ringClient, "test", nil, diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index aeb4ee436faa5..5d819c8d88b50 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -229,8 +229,9 @@ type Limits struct { IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` PatternIngesterTokenizableJSONFieldsDefault dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_default" json:"pattern_ingester_tokenizable_json_fields_default" doc:"hidden"` - PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"` - PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"` + MetricAggregationEnabled bool `yaml:"metric_aggregation_enabled" json:"metric_aggregation_enabled"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. @@ -438,6 +439,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.PatternIngesterTokenizableJSONFieldsDefault, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the pattern ingester.") f.Var(&l.PatternIngesterTokenizableJSONFieldsAppend, "limits.pattern-ingester-tokenizable-json-fields-append", "List of JSON fields that should be appended to the default list of tokenizable fields in the pattern ingester.") f.Var(&l.PatternIngesterTokenizableJSONFieldsDelete, "limits.pattern-ingester-tokenizable-json-fields-delete", "List of JSON fields that should be deleted from the (default U append) list of tokenizable fields in the pattern ingester.") + + f.BoolVar( + &l.MetricAggregationEnabled, + "limits.metric-aggregation-enabled", + false, + "Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.", + ) } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -1113,6 +1121,10 @@ func (o *Overrides) PatternIngesterTokenizableJSONFieldsDelete(userID string) [] return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete } +func (o *Overrides) MetricAggregationEnabled(userID string) bool { + return o.getOverridesForUser(userID).MetricAggregationEnabled +} + // S3SSEType returns the per-tenant S3 SSE type. func (o *Overrides) S3SSEType(user string) string { return o.getOverridesForUser(user).S3SSEType diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index bfb522f73a2e6..71252c3b6cc82 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -414,3 +414,36 @@ pattern_ingester_tokenizable_json_fields_delete: body }) } } + +func Test_MetricAggregationEnabled(t *testing.T) { + for _, tc := range []struct { + name string + yaml string + expected bool + }{ + { + name: "when true", + yaml: ` +metric_aggregation_enabled: true +`, + expected: true, + }, + { + name: "when false", + yaml: ` +metric_aggregation_enabled: false +`, + expected: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + overrides := Overrides{ + defaultLimits: &Limits{}, + } + require.NoError(t, yaml.Unmarshal([]byte(tc.yaml), overrides.defaultLimits)) + + actual := overrides.MetricAggregationEnabled("fake") + require.Equal(t, tc.expected, actual) + }) + } +}