Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move metric aggregation to a per-tenant config #14709

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,6 @@ pattern_ingester:
# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.metric-aggregation.downsample-period
[downsample_period: <duration> | default = 10s]
Expand Down Expand Up @@ -3845,6 +3841,13 @@ otlp_config:
# CLI flag: -limits.ingestion-partition-tenant-shard-size
[ingestion_partitions_tenant_shard_size: <int> | default = 0]

# Enable metric aggregation. When enabled, pushed streams will be sampled for
# bytes and count, and these metric will be written back into Loki as a special
# __aggregated_metric__ stream, which can be queried for faster histogram
# queries.
# CLI flag: -limits.metric-aggregation-enabled
[metric_aggregation_enabled: <boolean> | default = false]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) {

svc, err := pattern.NewTeeService(
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,
Expand Down
12 changes: 4 additions & 8 deletions pkg/pattern/aggregation/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
)

type Config struct {
// TODO(twhitney): This needs to be a per-tenant config
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
DownsamplePeriod time.Duration `yaml:"downsample_period"`
LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."`
WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."`
Expand All @@ -27,12 +25,6 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
}

func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) {
fs.BoolVar(
&cfg.Enabled,
prefix+"metric-aggregation.enabled",
false,
"Flag to enable or disable metric aggregation.",
)
fs.DurationVar(
&cfg.DownsamplePeriod,
prefix+"metric-aggregation.downsample-period",
Expand Down Expand Up @@ -105,3 +97,7 @@ func (s *secretValue) Set(val string) error {
func (s *secretValue) Get() any { return string(*s) }

func (s *secretValue) String() string { return string(*s) }

type Limits interface {
MetricAggregationEnabled(userID string) bool
}
42 changes: 17 additions & 25 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (cfg *Config) Validate() error {

type Limits interface {
drain.Limits
aggregation.Limits
}

type Ingester struct {
Expand Down Expand Up @@ -294,29 +295,18 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
} else {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case <-i.loopQuit:
return
}
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
select {
case <-flushTicker.C:
i.sweepUsers(false, true)
case t := <-downsampleTicker.C:
downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod)
now := model.TimeFromUnixNano(t.UnixNano())
i.downsampleMetrics(now)
case <-i.loopQuit:
return
}
}
}
Expand Down Expand Up @@ -401,7 +391,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
var writer aggregation.EntryWriter

aggCfg := i.cfg.MetricAggregation
if aggCfg.Enabled {
if i.limits.MetricAggregationEnabled(instanceID) {
writer, err = aggregation.NewPush(
aggCfg.LokiAddr,
instanceID,
Expand Down Expand Up @@ -469,6 +459,8 @@ func (i *Ingester) downsampleMetrics(ts model.Time) {
instances := i.getInstances()

for _, instance := range instances {
instance.Downsample(ts)
if i.limits.MetricAggregationEnabled(instance.instanceID) {
instance.Downsample(ts)
}
}
}
5 changes: 5 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,13 @@ func (m *mockEntryWriter) Stop() {

type fakeLimits struct {
Limits
metricAggregationEnabled bool
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}

func (f *fakeLimits) MetricAggregationEnabled(_ string) bool {
return f.metricAggregationEnabled
}
12 changes: 8 additions & 4 deletions pkg/pattern/tee_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

type TeeService struct {
cfg Config
limits Limits
logger log.Logger
ringClient RingClient
wg *sync.WaitGroup
Expand All @@ -48,6 +49,7 @@ type TeeService struct {

func NewTeeService(
cfg Config,
limits Limits,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
Expand Down Expand Up @@ -83,6 +85,7 @@ func NewTeeService(
),
),
cfg: cfg,
limits: limits,
ringClient: ringClient,

wg: &sync.WaitGroup{},
Expand Down Expand Up @@ -317,14 +320,15 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc()
level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err)

if !ts.cfg.MetricAggregation.Enabled {
return err
}

// Pattern ingesters serve 2 functions, processing patterns and aggregating metrics.
// Only owned streams are processed for patterns, however any pattern ingester can
// aggregate metrics for any stream. Therefore, if we can't send the owned stream,
// try to forward request to any pattern ingester so we at least capture the metrics.

if !ts.limits.MetricAggregationEnabled(clientRequest.tenant) {
return err
}

replicationSet, err := ts.ringClient.Ring().
GetReplicationSetForOperation(ring.WriteNoExtend)
if err != nil || len(replicationSet.Instances) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/pattern/tee_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) {

logsTee, err := NewTeeService(
cfg,
&fakeLimits{
metricAggregationEnabled: true,
},
ringClient,
"test",
nil,
Expand Down
16 changes: 14 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,9 @@ type Limits struct {
IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

PatternIngesterTokenizableJSONFieldsDefault dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_default" json:"pattern_ingester_tokenizable_json_fields_default" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsAppend dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_append" json:"pattern_ingester_tokenizable_json_fields_append" doc:"hidden"`
PatternIngesterTokenizableJSONFieldsDelete dskit_flagext.StringSliceCSV `yaml:"pattern_ingester_tokenizable_json_fields_delete" json:"pattern_ingester_tokenizable_json_fields_delete" doc:"hidden"`
MetricAggregationEnabled bool `yaml:"metric_aggregation_enabled" json:"metric_aggregation_enabled"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
Expand Down Expand Up @@ -437,6 +438,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.PatternIngesterTokenizableJSONFieldsDefault, "limits.pattern-ingester-tokenizable-json-fields", "List of JSON fields that should be tokenized in the pattern ingester.")
f.Var(&l.PatternIngesterTokenizableJSONFieldsAppend, "limits.pattern-ingester-tokenizable-json-fields-append", "List of JSON fields that should be appended to the default list of tokenizable fields in the pattern ingester.")
f.Var(&l.PatternIngesterTokenizableJSONFieldsDelete, "limits.pattern-ingester-tokenizable-json-fields-delete", "List of JSON fields that should be deleted from the (default U append) list of tokenizable fields in the pattern ingester.")

f.BoolVar(
&l.MetricAggregationEnabled,
"limits.metric-aggregation-enabled",
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could/should we default this to true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make the deployment easier. My big concern is the surprise for OSS users, as I'd still consider it a beta/experimental feature.

"Enable metric aggregation. When enabled, pushed streams will be sampled for bytes and count, and these metric will be written back into Loki as a special __aggregated_metric__ stream, which can be queried for faster histogram queries.",
)
}

// SetGlobalOTLPConfig set GlobalOTLPConfig which is used while unmarshaling per-tenant otlp config to use the default list of resource attributes picked as index labels.
Expand Down Expand Up @@ -1112,6 +1120,10 @@ func (o *Overrides) PatternIngesterTokenizableJSONFieldsDelete(userID string) []
return o.getOverridesForUser(userID).PatternIngesterTokenizableJSONFieldsDelete
}

func (o *Overrides) MetricAggregationEnabled(userID string) bool {
return o.getOverridesForUser(userID).MetricAggregationEnabled
}

// S3SSEType returns the per-tenant S3 SSE type.
func (o *Overrides) S3SSEType(user string) string {
return o.getOverridesForUser(user).S3SSEType
Expand Down
33 changes: 33 additions & 0 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,36 @@ pattern_ingester_tokenizable_json_fields_delete: body
})
}
}

func Test_MetricAggregationEnabled(t *testing.T) {
for _, tc := range []struct {
name string
yaml string
expected bool
}{
{
name: "when true",
yaml: `
metric_aggregation_enabled: true
`,
expected: true,
},
{
name: "when false",
yaml: `
metric_aggregation_enabled: false
`,
expected: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
overrides := Overrides{
defaultLimits: &Limits{},
}
require.NoError(t, yaml.Unmarshal([]byte(tc.yaml), overrides.defaultLimits))

actual := overrides.MetricAggregationEnabled("fake")
require.Equal(t, tc.expected, actual)
})
}
}
Loading