diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 8081c058f89e..75b3e85749e7 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3680,7 +3680,7 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v # list to service_name. If none of the configured labels exist in the stream, # label is set to unknown_service. Empty list disables setting the label. # CLI flag: -validation.discover-service-name -[discover_service_name: | default = [service app application name app_kubernetes_io_name container container_name component workload job]] +[discover_service_name: | default = [service app application name app_kubernetes_io_name container container_name k8s_container_name component workload job k8s_job_name]] # Discover and add log levels during ingestion, if not present already. Levels # would be added to Structured Metadata with name diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index a361bbbf196d..13aea9ee59ca 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -47,7 +47,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe return nil, nil, err } - req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats) + req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats) return req, stats, nil } @@ -98,7 +98,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest { +func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -111,12 +111,14 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten res := rls.At(i).Resource() resAttrs := res.Attributes() - if v, ok := resAttrs.Get(attrServiceName); !ok || v.AsString() == "" { - resAttrs.PutStr(attrServiceName, "unknown_service") - } resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len()) streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size + shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric + hasServiceName := false + if v, ok := resAttrs.Get(attrServiceName); ok && v.AsString() != "" { + hasServiceName = true + } resAttrs.Range(func(k string, v pcommon.Value) bool { action := otlpConfig.ActionForResourceAttribute(k) if action == Drop { @@ -127,6 +129,16 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten if action == IndexLabel { for _, lbl := range attributeAsLabels { streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + + if !hasServiceName && shouldDiscoverServiceName { + for _, labelName := range discoverServiceName { + if lbl.Name == labelName { + streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(lbl.Value) + hasServiceName = true + break + } + } + } } } else if action == StructuredMetadata { resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...) @@ -135,6 +147,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten return true }) + if !hasServiceName && shouldDiscoverServiceName { + streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown) + } + if err := streamLabels.Validate(); err != nil { stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err)) continue diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index bcdeb18d1706..e2ca137f274c 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -20,6 +20,20 @@ import ( func TestOTLPToLokiPushRequest(t *testing.T) { now := time.Unix(0, time.Now().UnixNano()) + defaultServiceDetection := []string{ + "service", + "app", + "application", + "name", + "app_kubernetes_io_name", + "container", + "container_name", + "k8s_container_name", + "component", + "workload", + "job", + "k8s_job_name", + } for _, tc := range []struct { name string @@ -346,7 +360,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) { { Action: IndexLabel, Attributes: []string{"pod.name"}, - }, { + }, + { Action: IndexLabel, Regex: relabel.MustNewRegexp("service.*"), }, @@ -493,7 +508,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() tracker := NewMockTracker() - pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats) + pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) @@ -592,7 +607,6 @@ func TestOTLPLogToPushEntry(t *testing.T) { require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig(defaultGlobalOTLPConfig))) }) } - } func TestAttributesToLabels(t *testing.T) { diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 80e7c5e7eead..e63b2c873c8d 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -6,7 +6,9 @@ import ( "compress/gzip" "context" "fmt" + "io" "log" + "net/http" "net/http/httptest" "strings" "testing" @@ -16,6 +18,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/grafana/dskit/flagext" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -256,7 +262,7 @@ func TestParseRequest(t *testing.T) { } tracker := NewMockTracker() - data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived @@ -314,19 +320,124 @@ func TestParseRequest(t *testing.T) { } } +func Test_ServiceDetection(t *testing.T) { + tracker := NewMockTracker() + + createOtlpLogs := func(labels ...string) []byte { + now := time.Unix(0, time.Now().UnixNano()) + ld := plog.NewLogs() + for i := 0; i < len(labels); i += 2 { + ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr(labels[i], labels[i+1]) + } + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body") + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano())) + + jsonMarshaller := plog.JSONMarshaler{} + body, err := jsonMarshaller.MarshalLogs(ld) + + require.NoError(t, err) + return body + } + + createRequest := func(path string, body io.Reader) *http.Request { + request := httptest.NewRequest( + "POST", + path, + body, + ) + request.Header.Add("Content-Type", "application/json") + + return request + } + + t.Run("detects servce from loki push requests", func(t *testing.T) { + body := `{"streams": [{ "stream": { "foo": "bar" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}` + request := createRequest("/loki/api/v1/push", strings.NewReader(body)) + + limits := &fakeLimits{enabled: true, labels: []string{"foo"}} + data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker) + + require.NoError(t, err) + require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) + }) + + t.Run("detects servce from OTLP push requests using default indexing", func(t *testing.T) { + body := createOtlpLogs("k8s.job.name", "bar") + request := createRequest("/otlp/v1/push", bytes.NewReader(body)) + + limits := &fakeLimits{enabled: true} + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + require.NoError(t, err) + require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) + }) + + t.Run("detects servce from OTLP push requests using custom indexing", func(t *testing.T) { + body := createOtlpLogs("special", "sauce") + request := createRequest("/otlp/v1/push", bytes.NewReader(body)) + + limits := &fakeLimits{ + enabled: true, + labels: []string{"special"}, + indexAttributes: []string{"special"}, + } + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + require.NoError(t, err) + require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels) + }) + + t.Run("only detects custom service label from indexed labels", func(t *testing.T) { + body := createOtlpLogs("special", "sauce") + request := createRequest("/otlp/v1/push", bytes.NewReader(body)) + + limits := &fakeLimits{ + enabled: true, + labels: []string{"special"}, + indexAttributes: []string{}, + } + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + require.NoError(t, err) + require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels) + }) +} + type fakeLimits struct { - enabled bool + enabled bool + labels []string + indexAttributes []string } -func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig { - return OTLPConfig{} +func (f *fakeLimits) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration { + return time.Hour } -func (l *fakeLimits) DiscoverServiceName(_ string) []string { - if !l.enabled { +func (f *fakeLimits) OTLPConfig(_ string) OTLPConfig { + if len(f.indexAttributes) > 0 { + return OTLPConfig{ + ResourceAttributes: ResourceAttributesConfig{ + AttributesConfig: []AttributesConfig{ + { + Action: IndexLabel, + Attributes: f.indexAttributes, + }, + }, + }, + } + } + + defaultGlobalOTLPConfig := GlobalOTLPConfig{} + flagext.DefaultValues(&defaultGlobalOTLPConfig) + return DefaultOTLPConfig(defaultGlobalOTLPConfig) +} + +func (f *fakeLimits) DiscoverServiceName(_ string) []string { + if !f.enabled { return nil } + if len(f.labels) > 0 { + return f.labels + } + return []string{ "service", "app", @@ -335,9 +446,11 @@ func (l *fakeLimits) DiscoverServiceName(_ string) []string { "app_kubernetes_io_name", "container", "container_name", + "k8s_container_name", "component", "workload", "job", + "k8s_job_name", } } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 3e44dc204777..75128607b882 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -263,9 +263,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { "app_kubernetes_io_name", "container", "container_name", + "k8s_container_name", "component", "workload", "job", + "k8s_job_name", } f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.") f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")