Skip to content

Commit

Permalink
feat: aggregate byte and count metrics (#13731)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Aug 16, 2024
1 parent ebf5f1d commit 913e9f9
Show file tree
Hide file tree
Showing 24 changed files with 2,651 additions and 234 deletions.
2 changes: 1 addition & 1 deletion cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pattern_ingester:
enabled: true
metric_aggregation:
enabled: true
log_push_observations: true
loki_address: localhost:3100

ruler:
alertmanager_url: http://localhost:9093
Expand Down
200 changes: 200 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,206 @@ pattern_ingester:
# CLI flag: -pattern-ingester.max-eviction-ratio
[max_eviction_ratio: <float> | default = 0.25]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.metric-aggregation.downsample-period
[downsample_period: <duration> | default = 10s]

# The address of the Loki instance to push aggregated metrics to.
# CLI flag: -pattern-ingester.metric-aggregation.loki-address
[loki_address: <string> | default = ""]

# The timeout for writing to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.timeout
[timeout: <duration> | default = 10s]

# How long to wait in between pushes to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.push-period
[push_period: <duration> | default = 30s]

# The HTTP client configuration for pushing metrics to Loki.
http_client_config:
basic_auth:
[username: <string> | default = ""]

[username_file: <string> | default = ""]

[username_ref: <string> | default = ""]

[password: <string> | default = ""]

[password_file: <string> | default = ""]

[password_ref: <string> | default = ""]

authorization:
[type: <string> | default = ""]

[credentials: <string> | default = ""]

[credentials_file: <string> | default = ""]

[credentials_ref: <string> | default = ""]

oauth2:
[client_id: <string> | default = ""]

[client_secret: <string> | default = ""]

[client_secret_file: <string> | default = ""]

[client_secret_ref: <string> | default = ""]

[scopes: <list of strings>]

[token_url: <string> | default = ""]

[endpoint_params: <map of string to string>]

tls_config:
[ca: <string> | default = ""]

[cert: <string> | default = ""]

[key: <string> | default = ""]

[ca_file: <string> | default = ""]

[cert_file: <string> | default = ""]

[key_file: <string> | default = ""]

[ca_ref: <string> | default = ""]

[cert_ref: <string> | default = ""]

[key_ref: <string> | default = ""]

[server_name: <string> | default = ""]

[insecure_skip_verify: <boolean>]

[min_version: <int>]

[max_version: <int>]

proxy_url:
[url: <url>]

[no_proxy: <string> | default = ""]

[proxy_from_environment: <boolean>]

[proxy_connect_header: <map of string to list of strings>]

[bearer_token: <string> | default = ""]

[bearer_token_file: <string> | default = ""]

tls_config:
[ca: <string> | default = ""]

[cert: <string> | default = ""]

[key: <string> | default = ""]

[ca_file: <string> | default = ""]

[cert_file: <string> | default = ""]

[key_file: <string> | default = ""]

[ca_ref: <string> | default = ""]

[cert_ref: <string> | default = ""]

[key_ref: <string> | default = ""]

[server_name: <string> | default = ""]

[insecure_skip_verify: <boolean>]

[min_version: <int>]

[max_version: <int>]

[follow_redirects: <boolean>]

[enable_http2: <boolean>]

proxy_url:
[url: <url>]

[no_proxy: <string> | default = ""]

[proxy_from_environment: <boolean>]

[proxy_connect_header: <map of string to list of strings>]

http_headers:
[: <map of string to Header>]

# Whether to use TLS for pushing metrics to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.tls
[use_tls: <boolean> | default = false]

# The basic auth configuration for pushing metrics to Loki.
basic_auth:
# Basic auth username for sending aggregations back to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.basic-auth.username
[username: <string> | default = ""]

# Basic auth password for sending aggregations back to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.basic-auth.password
[password: <string> | default = ""]

# The backoff configuration for pushing metrics to Loki.
backoff_config:
# Minimum delay when backing off.
# CLI flag: -pattern-ingester.metric-aggregation.backoff-min-period
[min_period: <duration> | default = 100ms]

# Maximum delay when backing off.
# CLI flag: -pattern-ingester.metric-aggregation.backoff-max-period
[max_period: <duration> | default = 10s]

# Number of times to backoff and retry before failing.
# CLI flag: -pattern-ingester.metric-aggregation.backoff-retries
[max_retries: <int> | default = 10]

# Configures the pattern tee which forwards requests to the pattern ingester.
tee_config:
# The size of the batch of raw logs to send for template mining
# CLI flag: -pattern-ingester.tee.batch-size
[batch_size: <int> | default = 5000]

# The max time between batches of raw logs to send for template mining
# CLI flag: -pattern-ingester.tee.batch-flush-interval
[batch_flush_interval: <duration> | default = 1s]

# The number of log flushes to queue before dropping
# CLI flag: -pattern-ingester.tee.flush-queue-size
[flush_queue_size: <int> | default = 1000]

# the number of concurrent workers sending logs to the template service
# CLI flag: -pattern-ingester.tee.flush-worker-count
[flush_worker_count: <int> | default = 100]

# The max time we will try to flush any remaining logs to be mined when the
# service is stopped
# CLI flag: -pattern-ingester.tee.stop-flush-timeout
[stop_flush_timeout: <duration> | default = 30s]

# Timeout for connections between the Loki and the pattern ingester.
# CLI flag: -pattern-ingester.connection-timeout
[connection_timeout: <duration> | default = 2s]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
58 changes: 24 additions & 34 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,6 @@ const (
ringKey = "distributor"

ringAutoForgetUnhealthyPeriods = 2

levelLabel = "detected_level"
logLevelDebug = "debug"
logLevelInfo = "info"
logLevelWarn = "warn"
logLevelError = "error"
logLevelFatal = "fatal"
logLevelCritical = "critical"
logLevelTrace = "trace"
logLevelUnknown = "unknown"
)

var (
Expand Down Expand Up @@ -406,9 +396,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
} else {
logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata)
}
if logLevel != logLevelUnknown && logLevel != "" {
if logLevel != constants.LogLevelUnknown && logLevel != "" {
entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{
Name: levelLabel,
Name: constants.LevelLabel,
Value: logLevel,
})
}
Expand Down Expand Up @@ -883,24 +873,24 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt)
if err != nil {
return logLevelInfo
return constants.LogLevelInfo
}
if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) {
return logLevelUnknown
return constants.LogLevelUnknown
} else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) {
return logLevelTrace
return constants.LogLevelTrace
} else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) {
return logLevelDebug
return constants.LogLevelDebug
} else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) {
return logLevelInfo
return constants.LogLevelInfo
} else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) {
return logLevelWarn
return constants.LogLevelWarn
} else if otlpSeverityNumber <= int(plog.SeverityNumberError4) {
return logLevelError
return constants.LogLevelError
} else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) {
return logLevelFatal
return constants.LogLevelFatal
}
return logLevelUnknown
return constants.LogLevelUnknown
}

return extractLogLevelFromLogLine(entry.Line)
Expand All @@ -917,19 +907,19 @@ func extractLogLevelFromLogLine(log string) string {

switch {
case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")):
return logLevelTrace
return constants.LogLevelTrace
case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")):
return logLevelDebug
return constants.LogLevelDebug
case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")):
return logLevelInfo
return constants.LogLevelInfo
case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")):
return logLevelWarn
return constants.LogLevelWarn
case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")):
return logLevelError
return constants.LogLevelError
case bytes.EqualFold(v, []byte("critical")):
return logLevelCritical
return constants.LogLevelCritical
case bytes.EqualFold(v, []byte("fatal")):
return logLevelFatal
return constants.LogLevelFatal
default:
return detectLevelFromLogLine(log)
}
Expand Down Expand Up @@ -984,21 +974,21 @@ func isJSON(line string) bool {
func detectLevelFromLogLine(log string) string {
if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") ||
strings.Contains(log, "info") || strings.Contains(log, "INFO") {
return logLevelInfo
return constants.LogLevelInfo
}
if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") ||
strings.Contains(log, "error") || strings.Contains(log, "ERROR") {
return logLevelError
return constants.LogLevelError
}
if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") ||
strings.Contains(log, "warning") || strings.Contains(log, "WARNING") {
return logLevelWarn
return constants.LogLevelWarn
}
if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") {
return logLevelCritical
return constants.LogLevelCritical
}
if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") {
return logLevelDebug
return constants.LogLevelDebug
}
return logLevelUnknown
return constants.LogLevelUnknown
}
Loading

0 comments on commit 913e9f9

Please sign in to comment.