Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: aggregate byte and count metrics #13731

Merged
merged 24 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7842180
Squashed commit of the following:
trevorwhitney Aug 12, 2024
41e36e4
fix: merge whoops
trevorwhitney Aug 12, 2024
553731b
null check tee in distributor
trevorwhitney Aug 12, 2024
17bb6de
fix: broken import
trevorwhitney Aug 12, 2024
5a3c861
fix: queue size labels
trevorwhitney Aug 12, 2024
3ee7945
move context closer to push
trevorwhitney Aug 12, 2024
8b567c3
slight reduction of allocations
trevorwhitney Aug 12, 2024
8de6bd4
fix: level detection
trevorwhitney Aug 12, 2024
2860dc0
feat: don't tee aggregated metric lines to pattern ingester
trevorwhitney Aug 13, 2024
84d88bf
feat: convert pattern tee to a service
trevorwhitney Aug 14, 2024
78a2066
chore: add error logging to metric appends
trevorwhitney Aug 14, 2024
ca6338f
feat: remove parsing labels from observe
trevorwhitney Aug 14, 2024
bfaf8d9
fix: nil check invalid level
trevorwhitney Aug 15, 2024
cd20b3b
feat: downcase level string
trevorwhitney Aug 15, 2024
b708cbb
fix: only write agg metrics we have values for
trevorwhitney Aug 15, 2024
67164ab
test: add tee service test
trevorwhitney Aug 15, 2024
1158d8a
Merge branch 'main' into store-aggregated-metrics-in-loki-2
trevorwhitney Aug 15, 2024
dd41222
docs: fix configuration reference
trevorwhitney Aug 15, 2024
826edc5
chore: lint and cleanup
trevorwhitney Aug 15, 2024
2b89765
fix: format and mock
trevorwhitney Aug 16, 2024
562766b
Merge branch 'main' into store-aggregated-metrics-in-loki-2
trevorwhitney Aug 16, 2024
3277b34
chore: move log levels into constants
trevorwhitney Aug 16, 2024
31b620d
chore: cleanup
trevorwhitney Aug 16, 2024
391c683
Merge branch 'main' into store-aggregated-metrics-in-loki-2
trevorwhitney Aug 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pattern_ingester:
enabled: true
metric_aggregation:
enabled: true
log_push_observations: true
loki_address: localhost:3100

ruler:
alertmanager_url: http://localhost:9093
Expand Down
200 changes: 200 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,206 @@ pattern_ingester:
# CLI flag: -pattern-ingester.max-eviction-ratio
[max_eviction_ratio: <float> | default = 0.25]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
metric_aggregation:
# Whether the pattern ingester metric aggregation is enabled.
# CLI flag: -pattern-ingester.metric-aggregation.enabled
[enabled: <boolean> | default = false]

# How often to downsample metrics from raw push observations.
# CLI flag: -pattern-ingester.metric-aggregation.downsample-period
[downsample_period: <duration> | default = 10s]

# The address of the Loki instance to push aggregated metrics to.
# CLI flag: -pattern-ingester.metric-aggregation.loki-address
[loki_address: <string> | default = ""]

# The timeout for writing to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.timeout
[timeout: <duration> | default = 10s]

# How long to wait in between pushes to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.push-period
[push_period: <duration> | default = 30s]

# The HTTP client configuration for pushing metrics to Loki.
http_client_config:
basic_auth:
[username: <string> | default = ""]

[username_file: <string> | default = ""]

[username_ref: <string> | default = ""]

[password: <string> | default = ""]

[password_file: <string> | default = ""]

[password_ref: <string> | default = ""]

authorization:
[type: <string> | default = ""]

[credentials: <string> | default = ""]

[credentials_file: <string> | default = ""]

[credentials_ref: <string> | default = ""]

oauth2:
[client_id: <string> | default = ""]

[client_secret: <string> | default = ""]

[client_secret_file: <string> | default = ""]

[client_secret_ref: <string> | default = ""]

[scopes: <list of strings>]

[token_url: <string> | default = ""]

[endpoint_params: <map of string to string>]

tls_config:
[ca: <string> | default = ""]

[cert: <string> | default = ""]

[key: <string> | default = ""]

[ca_file: <string> | default = ""]

[cert_file: <string> | default = ""]

[key_file: <string> | default = ""]

[ca_ref: <string> | default = ""]

[cert_ref: <string> | default = ""]

[key_ref: <string> | default = ""]

[server_name: <string> | default = ""]

[insecure_skip_verify: <boolean>]

[min_version: <int>]

[max_version: <int>]

proxy_url:
[url: <url>]

[no_proxy: <string> | default = ""]

[proxy_from_environment: <boolean>]

[proxy_connect_header: <map of string to list of strings>]

[bearer_token: <string> | default = ""]

[bearer_token_file: <string> | default = ""]

tls_config:
[ca: <string> | default = ""]

[cert: <string> | default = ""]

[key: <string> | default = ""]

[ca_file: <string> | default = ""]

[cert_file: <string> | default = ""]

[key_file: <string> | default = ""]

[ca_ref: <string> | default = ""]

[cert_ref: <string> | default = ""]

[key_ref: <string> | default = ""]

[server_name: <string> | default = ""]

[insecure_skip_verify: <boolean>]

[min_version: <int>]

[max_version: <int>]

[follow_redirects: <boolean>]

[enable_http2: <boolean>]

proxy_url:
[url: <url>]

[no_proxy: <string> | default = ""]

[proxy_from_environment: <boolean>]

[proxy_connect_header: <map of string to list of strings>]

http_headers:
[: <map of string to Header>]

# Whether to use TLS for pushing metrics to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.tls
[use_tls: <boolean> | default = false]

# The basic auth configuration for pushing metrics to Loki.
basic_auth:
# Basic auth username for sending aggregations back to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.basic-auth.username
[username: <string> | default = ""]

# Basic auth password for sending aggregations back to Loki.
# CLI flag: -pattern-ingester.metric-aggregation.basic-auth.password
[password: <string> | default = ""]

# The backoff configuration for pushing metrics to Loki.
backoff_config:
# Minimum delay when backing off.
# CLI flag: -pattern-ingester.metric-aggregation.backoff-min-period
[min_period: <duration> | default = 100ms]

# Maximum delay when backing off.
# CLI flag: -pattern-ingester.metric-aggregation.backoff-max-period
[max_period: <duration> | default = 10s]

# Number of times to backoff and retry before failing.
# CLI flag: -pattern-ingester.metric-aggregation.backoff-retries
[max_retries: <int> | default = 10]

# Configures the pattern tee which forwards requests to the pattern ingester.
tee_config:
# The size of the batch of raw logs to send for template mining
# CLI flag: -pattern-ingester.tee.batch-size
[batch_size: <int> | default = 5000]

# The max time between batches of raw logs to send for template mining
# CLI flag: -pattern-ingester.tee.batch-flush-interval
[batch_flush_interval: <duration> | default = 1s]

# The number of log flushes to queue before dropping
# CLI flag: -pattern-ingester.tee.flush-queue-size
[flush_queue_size: <int> | default = 1000]

# the number of concurrent workers sending logs to the template service
# CLI flag: -pattern-ingester.tee.flush-worker-count
[flush_worker_count: <int> | default = 100]

# The max time we will try to flush any remaining logs to be mined when the
# service is stopped
# CLI flag: -pattern-ingester.tee.stop-flush-timeout
[stop_flush_timeout: <duration> | default = 30s]

# Timeout for connections between the Loki and the pattern ingester.
# CLI flag: -pattern-ingester.connection-timeout
[connection_timeout: <duration> | default = 2s]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
58 changes: 24 additions & 34 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,6 @@ const (
ringKey = "distributor"

ringAutoForgetUnhealthyPeriods = 2

levelLabel = "detected_level"
logLevelDebug = "debug"
logLevelInfo = "info"
logLevelWarn = "warn"
logLevelError = "error"
logLevelFatal = "fatal"
logLevelCritical = "critical"
logLevelTrace = "trace"
logLevelUnknown = "unknown"
)

var (
Expand Down Expand Up @@ -406,9 +396,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
} else {
logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata)
}
if logLevel != logLevelUnknown && logLevel != "" {
if logLevel != constants.LogLevelUnknown && logLevel != "" {
entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{
Name: levelLabel,
Name: constants.LevelLabel,
Value: logLevel,
})
}
Expand Down Expand Up @@ -883,24 +873,24 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt)
if err != nil {
return logLevelInfo
return constants.LogLevelInfo
}
if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) {
return logLevelUnknown
return constants.LogLevelUnknown
} else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) {
return logLevelTrace
return constants.LogLevelTrace
} else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) {
return logLevelDebug
return constants.LogLevelDebug
} else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) {
return logLevelInfo
return constants.LogLevelInfo
} else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) {
return logLevelWarn
return constants.LogLevelWarn
} else if otlpSeverityNumber <= int(plog.SeverityNumberError4) {
return logLevelError
return constants.LogLevelError
} else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) {
return logLevelFatal
return constants.LogLevelFatal
}
return logLevelUnknown
return constants.LogLevelUnknown
}

return extractLogLevelFromLogLine(entry.Line)
Expand All @@ -917,19 +907,19 @@ func extractLogLevelFromLogLine(log string) string {

switch {
case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")):
return logLevelTrace
return constants.LogLevelTrace
case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")):
return logLevelDebug
return constants.LogLevelDebug
case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")):
return logLevelInfo
return constants.LogLevelInfo
case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")):
return logLevelWarn
return constants.LogLevelWarn
case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")):
return logLevelError
return constants.LogLevelError
case bytes.EqualFold(v, []byte("critical")):
return logLevelCritical
return constants.LogLevelCritical
case bytes.EqualFold(v, []byte("fatal")):
return logLevelFatal
return constants.LogLevelFatal
default:
return detectLevelFromLogLine(log)
}
Expand Down Expand Up @@ -984,21 +974,21 @@ func isJSON(line string) bool {
func detectLevelFromLogLine(log string) string {
if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") ||
strings.Contains(log, "info") || strings.Contains(log, "INFO") {
return logLevelInfo
return constants.LogLevelInfo
}
if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") ||
strings.Contains(log, "error") || strings.Contains(log, "ERROR") {
return logLevelError
return constants.LogLevelError
}
if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") ||
strings.Contains(log, "warning") || strings.Contains(log, "WARNING") {
return logLevelWarn
return constants.LogLevelWarn
}
if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") {
return logLevelCritical
return constants.LogLevelCritical
}
if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") {
return logLevelDebug
return constants.LogLevelDebug
}
return logLevelUnknown
return constants.LogLevelUnknown
}
Loading
Loading