Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable service detection for otlp endoint #14036

Merged
merged 14 commits into from
Sep 4, 2024
Merged
2 changes: 1 addition & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3680,7 +3680,7 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# list to service_name. If none of the configured labels exist in the stream,
# label is set to unknown_service. Empty list disables setting the label.
# CLI flag: -validation.discover-service-name
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name component workload job]]
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name k8s_container_name component workload job k8s_job_name]]

# Discover and add log levels during ingestion, if not present already. Levels
# would be added to Structured Metadata with name
Expand Down
26 changes: 21 additions & 5 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
return req, stats, nil
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -111,12 +111,14 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
res := rls.At(i).Resource()
resAttrs := res.Attributes()

if v, ok := resAttrs.Get(attrServiceName); !ok || v.AsString() == "" {
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size

shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric
hasServiceName := false
if v, ok := resAttrs.Get(attrServiceName); ok && v.AsString() != "" {
hasServiceName = true
}
resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
if action == Drop {
Expand All @@ -127,6 +129,16 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)

if !hasServiceName && shouldDiscoverServiceName {
for _, labelName := range discoverServiceName {
if lbl.Name == labelName {
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(lbl.Value)
hasServiceName = true
break
}
}
}
}
} else if action == StructuredMetadata {
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
Expand All @@ -135,6 +147,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
return true
})

if !hasServiceName && shouldDiscoverServiceName {
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
}

if err := streamLabels.Validate(); err != nil {
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
continue
Expand Down
20 changes: 17 additions & 3 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@ import (

func TestOTLPToLokiPushRequest(t *testing.T) {
now := time.Unix(0, time.Now().UnixNano())
defaultServiceDetection := []string{
"service",
"app",
"application",
"name",
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -346,7 +360,8 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
{
Action: IndexLabel,
Attributes: []string{"pod.name"},
}, {
},
{
Action: IndexLabel,
Regex: relabel.MustNewRegexp("service.*"),
},
Expand Down Expand Up @@ -493,7 +508,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
tracker := NewMockTracker()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats)
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

Expand Down Expand Up @@ -592,7 +607,6 @@ func TestOTLPLogToPushEntry(t *testing.T) {
require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig(defaultGlobalOTLPConfig)))
})
}

}

func TestAttributesToLabels(t *testing.T) {
Expand Down
125 changes: 119 additions & 6 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"compress/gzip"
"context"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"strings"
"testing"
Expand All @@ -16,6 +18,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/grafana/dskit/flagext"

util_log "github.com/grafana/loki/v3/pkg/util/log"
)
Expand Down Expand Up @@ -256,7 +262,7 @@ func TestParseRequest(t *testing.T) {
}

tracker := NewMockTracker()
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)

structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
Expand Down Expand Up @@ -314,19 +320,124 @@ func TestParseRequest(t *testing.T) {
}
}

func Test_ServiceDetection(t *testing.T) {
tracker := NewMockTracker()

createOtlpLogs := func(labels ...string) []byte {
now := time.Unix(0, time.Now().UnixNano())
ld := plog.NewLogs()
for i := 0; i < len(labels); i += 2 {
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr(labels[i], labels[i+1])
}
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body")
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano()))

jsonMarshaller := plog.JSONMarshaler{}
body, err := jsonMarshaller.MarshalLogs(ld)

require.NoError(t, err)
return body
}

createRequest := func(path string, body io.Reader) *http.Request {
request := httptest.NewRequest(
"POST",
path,
body,
)
request.Header.Add("Content-Type", "application/json")

return request
}

t.Run("detects servce from loki push requests", func(t *testing.T) {
body := `{"streams": [{ "stream": { "foo": "bar" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`
request := createRequest("/loki/api/v1/push", strings.NewReader(body))

limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)

require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})

t.Run("detects servce from OTLP push requests using default indexing", func(t *testing.T) {
body := createOtlpLogs("k8s.job.name", "bar")
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{enabled: true}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})

t.Run("detects servce from OTLP push requests using custom indexing", func(t *testing.T) {
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved
body := createOtlpLogs("special", "sauce")
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{
enabled: true,
labels: []string{"special"},
indexAttributes: []string{"special"},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})

t.Run("only detects custom service label from indexed labels", func(t *testing.T) {
body := createOtlpLogs("special", "sauce")
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{
enabled: true,
labels: []string{"special"},
indexAttributes: []string{},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
})
}

type fakeLimits struct {
enabled bool
enabled bool
labels []string
indexAttributes []string
}

func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig {
return OTLPConfig{}
func (f *fakeLimits) RetentionPeriodFor(_ string, _ labels.Labels) time.Duration {
return time.Hour
}

func (l *fakeLimits) DiscoverServiceName(_ string) []string {
if !l.enabled {
func (f *fakeLimits) OTLPConfig(_ string) OTLPConfig {
if len(f.indexAttributes) > 0 {
return OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
{
Action: IndexLabel,
Attributes: f.indexAttributes,
},
},
},
}
}

defaultGlobalOTLPConfig := GlobalOTLPConfig{}
flagext.DefaultValues(&defaultGlobalOTLPConfig)
return DefaultOTLPConfig(defaultGlobalOTLPConfig)
}

func (f *fakeLimits) DiscoverServiceName(_ string) []string {
if !f.enabled {
return nil
}

if len(f.labels) > 0 {
return f.labels
}

return []string{
"service",
"app",
Expand All @@ -335,9 +446,11 @@ func (l *fakeLimits) DiscoverServiceName(_ string) []string {
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}
f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.")
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")
Expand Down
Loading