diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index e6dfa17fb843a..51220656c9fc8 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -359,10 +359,6 @@ pattern_ingester: # Configures the metric aggregation and storage behavior of the pattern # ingester. metric_aggregation: - # Whether the pattern ingester metric aggregation is enabled. - # CLI flag: -pattern-ingester.metric-aggregation.enabled - [enabled: | default = false] - # How often to downsample metrics from raw push observations. # CLI flag: -pattern-ingester.metric-aggregation.downsample-period [downsample_period: | default = 10s] @@ -3844,6 +3840,27 @@ otlp_config: # disables shuffle sharding and tenant is sharded across all partitions. # CLI flag: -limits.ingestion-partition-tenant-shard-size [ingestion_partitions_tenant_shard_size: | default = 0] + +# Enable metric aggregation. When enabled, pushed streams will be sampled for +# bytes and count, and these metric will be written back into Loki as a special +# __aggregated_metric__ stream, which can be queried for faster histogram +# queries. +# CLI flag: -limits.metric-aggregation-enabled +[metric_aggregation_enabled: | default = false] + +# S3 server-side encryption type. Required to enable server-side encryption +# overrides for a specific tenant. If not set, the default S3 client settings +# are used. +[s3_sse_type: | default = ""] + +# S3 server-side encryption KMS Key ID. Ignored if the SSE type override is not +# set. +[s3_sse_kms_key_id: | default = ""] + +# S3 server-side encryption KMS encryption context. If unset and the key ID +# override is set, the encryption context will not be provided to S3. Ignored if +# the SSE type override is not set. +[s3_sse_kms_encryption_context: | default = ""] ``` ### local_storage_config diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 86b5e668167cb..a87f03d1210c6 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -680,6 +680,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) { svc, err := pattern.NewTeeService( t.Cfg.Pattern, + t.Overrides, t.PatternRingClient, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, diff --git a/pkg/pattern/aggregation/config.go b/pkg/pattern/aggregation/config.go index b88eb8499ca73..c0c6dd6da6988 100644 --- a/pkg/pattern/aggregation/config.go +++ b/pkg/pattern/aggregation/config.go @@ -9,8 +9,6 @@ import ( ) type Config struct { - // TODO(twhitney): This needs to be a per-tenant config - Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` DownsamplePeriod time.Duration `yaml:"downsample_period"` LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."` WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."` @@ -27,12 +25,6 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { } func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) { - fs.BoolVar( - &cfg.Enabled, - prefix+"metric-aggregation.enabled", - false, - "Flag to enable or disable metric aggregation.", - ) fs.DurationVar( &cfg.DownsamplePeriod, prefix+"metric-aggregation.downsample-period", @@ -105,3 +97,7 @@ func (s *secretValue) Set(val string) error { func (s *secretValue) Get() any { return string(*s) } func (s *secretValue) String() string { return string(*s) } + +type Limits interface { + MetricAggregationEnabled(userID string) bool +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 60c71920b7d19..90e69f8433333 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -150,6 +150,7 @@ func (cfg *Config) Validate() error { type Limits interface { drain.Limits + aggregation.Limits } type Ingester struct { @@ -294,29 +295,18 @@ func (i *Ingester) loop() { flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) defer flushTicker.Stop() - if i.cfg.MetricAggregation.Enabled { - downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) - defer downsampleTicker.Stop() - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) - case t := <-downsampleTicker.C: - downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) - now := model.TimeFromUnixNano(t.UnixNano()) - i.downsampleMetrics(now) - case <-i.loopQuit: - return - } - } - } else { - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) - case <-i.loopQuit: - return - } + downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) + defer downsampleTicker.Stop() + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + case t := <-downsampleTicker.C: + downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) + now := model.TimeFromUnixNano(t.UnixNano()) + i.downsampleMetrics(now) + case <-i.loopQuit: + return } } } @@ -401,7 +391,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / var writer aggregation.EntryWriter aggCfg := i.cfg.MetricAggregation - if aggCfg.Enabled { + if i.limits.MetricAggregationEnabled(instanceID) { writer, err = aggregation.NewPush( aggCfg.LokiAddr, instanceID, @@ -469,6 +459,8 @@ func (i *Ingester) downsampleMetrics(ts model.Time) { instances := i.getInstances() for _, instance := range instances { - instance.Downsample(ts) + if i.limits.MetricAggregationEnabled(instance.instanceID) { + instance.Downsample(ts) + } } } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index effa1c1959437..0b1404fe544a8 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -341,8 +341,13 @@ func (m *mockEntryWriter) Stop() { type fakeLimits struct { Limits + metricAggregationEnabled bool } func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string { return []string{"log", "message", "msg", "msg_", "_msg", "content"} } + +func (f *fakeLimits) MetricAggregationEnabled(_ string) bool { + return f.metricAggregationEnabled +} diff --git a/pkg/pattern/tee_service.go b/pkg/pattern/tee_service.go index c279474cce42e..f94893ca6c91d 100644 --- a/pkg/pattern/tee_service.go +++ b/pkg/pattern/tee_service.go @@ -27,6 +27,7 @@ import ( type TeeService struct { cfg Config + limits Limits logger log.Logger ringClient RingClient wg *sync.WaitGroup @@ -48,6 +49,7 @@ type TeeService struct { func NewTeeService( cfg Config, + limits Limits, ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, @@ -83,6 +85,7 @@ func NewTeeService( ), ), cfg: cfg, + limits: limits, ringClient: ringClient, wg: &sync.WaitGroup{}, @@ -317,14 +320,15 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc() level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err) - if !ts.cfg.MetricAggregation.Enabled { - return err - } - // Pattern ingesters serve 2 functions, processing patterns and aggregating metrics. // Only owned streams are processed for patterns, however any pattern ingester can // aggregate metrics for any stream. Therefore, if we can't send the owned stream, // try to forward request to any pattern ingester so we at least capture the metrics. + + if !ts.limits.MetricAggregationEnabled(clientRequest.tenant) { + return err + } + replicationSet, err := ts.ringClient.Ring(). GetReplicationSetForOperation(ring.WriteNoExtend) if err != nil || len(replicationSet.Instances) == 0 { diff --git a/pkg/pattern/tee_service_test.go b/pkg/pattern/tee_service_test.go index 1be8114df0220..0fb8a032f062a 100644 --- a/pkg/pattern/tee_service_test.go +++ b/pkg/pattern/tee_service_test.go @@ -47,6 +47,9 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) { logsTee, err := NewTeeService( cfg, + &fakeLimits{ + metricAggregationEnabled: true, + }, ringClient, "test", nil, diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 0f34448c6270c..5788cdfa41a8a 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -229,8 +229,15 @@ type Limits struct { IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"` PatternIngesterTokenizableJSONFieldsDefault dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_default" json:"pattern_ingester_tokenizable_json_fields_default" doc:"hidden"` - PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"` - PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"` + PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"` + MetricAggregationEnabled bool `yaml:"metric_aggregation_enabled" json:"metric_aggregation_enabled"` + + // This config doesn't have a CLI flag registered here because they're registered in + // their own original config struct. + S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."` + S3SSEKMSKeyID string `yaml:"s3_sse_kms_key_id" json:"s3_sse_kms_key_id" doc:"nocli|description=S3 server-side encryption KMS Key ID. Ignored if the SSE type override is not set."` + S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."` } type StreamRetention struct { @@ -431,6 +438,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.PatternIngesterTokenizableJSONFieldsDefault, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the pattern ingester.") f.Var(&l.PatternIngesterTokenizableJSONFieldsAppend, "limits.pattern-ingester-tokenizable-json-fields-append", "List of JSON fields that should be appended to the default list of tokenizable fields in the pattern ingester.") f.Var(&l.PatternIngesterTokenizableJSONFieldsDelete, "limits.pattern-ingester-tokenizable-json-fields-delete", "List of JSON fields that should be deleted from the (default U append) list of tokenizable fields in the pattern ingester.") + + f.BoolVar( + &l.MetricAggregationEnabled, + "limits.metric-aggregation-enabled", + false, + "Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.", + ) } // SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels. @@ -1106,6 +1120,25 @@ func (o *Overrides) PatternIngesterTokenizableJSONFieldsDelete(userID string) [] return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete } +func (o *Overrides) MetricAggregationEnabled(userID string) bool { + return o.getOverridesForUser(userID).MetricAggregationEnabled +} + +// S3SSEType returns the per-tenant S3 SSE type. +func (o *Overrides) S3SSEType(user string) string { + return o.getOverridesForUser(user).S3SSEType +} + +// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id. +func (o *Overrides) S3SSEKMSKeyID(user string) string { + return o.getOverridesForUser(user).S3SSEKMSKeyID +} + +// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE encryption context. +func (o *Overrides) S3SSEKMSEncryptionContext(user string) string { + return o.getOverridesForUser(user).S3SSEKMSEncryptionContext +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID) diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index bfb522f73a2e6..71252c3b6cc82 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -414,3 +414,36 @@ pattern_ingester_tokenizable_json_fields_delete: body }) } } + +func Test_MetricAggregationEnabled(t *testing.T) { + for _, tc := range []struct { + name string + yaml string + expected bool + }{ + { + name: "when true", + yaml: ` +metric_aggregation_enabled: true +`, + expected: true, + }, + { + name: "when false", + yaml: ` +metric_aggregation_enabled: false +`, + expected: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + overrides := Overrides{ + defaultLimits: &Limits{}, + } + require.NoError(t, yaml.Unmarshal([]byte(tc.yaml), overrides.defaultLimits)) + + actual := overrides.MetricAggregationEnabled("fake") + require.Equal(t, tc.expected, actual) + }) + } +}