From d7ff42664681794b9ef5026ac3758cdd9569ac1a Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 2 Oct 2024 09:17:38 -0600 Subject: [PATCH] feat: ability to log stream selectors before service name detection (#14154) --- .../promtail/targets/lokipush/pushtarget.go | 2 +- pkg/distributor/http.go | 5 +-- pkg/distributor/http_test.go | 11 ++++++- pkg/loghttp/push/otlp.go | 33 +++++++++++++++++-- pkg/loghttp/push/otlp_test.go | 14 +++++++- pkg/loghttp/push/push.go | 23 ++++++++++--- pkg/loghttp/push/push_test.go | 19 ++++++++--- 7 files changed, 89 insertions(+), 18 deletions(-) diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index 1ec021c0b28a8..f6e33eb8f72d9 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -111,7 +111,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := tenant.TenantID(r.Context()) - req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil) + req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil, false) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index ec0660b91bc01..636a16bb507b1 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -58,7 +58,8 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe pushRequestParser = d.RequestParserWrapper(pushRequestParser) } - req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker) + logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID) + req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams) if err != nil { if d.tenantConfigs.LogPushRequest(tenantID) { level.Debug(logger).Log( @@ -73,7 +74,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe return } - if d.tenantConfigs.LogPushRequestStreams(tenantID) { + if logPushRequestStreams { var sb strings.Builder for _, s := range req.Streams { sb.WriteString(s.Labels) diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index b6281b81bf3d7..c6b8f3d017af6 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "testing" + "github.com/go-kit/log" "github.com/grafana/dskit/user" "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -114,6 +115,14 @@ func Test_OtelErrorHeaderInterceptor(t *testing.T) { } } -func stubParser(_ string, _ *http.Request, _ push.TenantsRetention, _ push.Limits, _ push.UsageTracker) (*logproto.PushRequest, *push.Stats, error) { +func stubParser( + _ string, + _ *http.Request, + _ push.TenantsRetention, + _ push.Limits, + _ push.UsageTracker, + _ bool, + _ log.Logger, +) (*logproto.PushRequest, *push.Stats, error) { return &logproto.PushRequest{}, &push.Stats{}, nil } diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 13aea9ee59caa..3e654b9c21ef2 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -8,8 +8,11 @@ import ( "io" "net/http" "sort" + "strings" "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -40,14 +43,14 @@ func newPushStats() *Stats { } } -func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { +func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) { stats := newPushStats() otlpLogs, err := extractLogs(r, stats) if err != nil { return nil, nil, err } - req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats) + req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger) return req, stats, nil } @@ -98,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest { +func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -113,6 +116,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len()) streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size + var pushedLabels model.LabelSet + if logPushRequestStreams { + pushedLabels = make(model.LabelSet, 30) + } shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric hasServiceName := false @@ -129,6 +136,9 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten if action == IndexLabel { for _, lbl := range attributeAsLabels { streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + if logPushRequestStreams && pushedLabels != nil { + pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } if !hasServiceName && shouldDiscoverServiceName { for _, labelName := range discoverServiceName { @@ -151,6 +161,23 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown) } + if logPushRequestStreams { + var sb strings.Builder + sb.WriteString("{") + labels := make([]string, 0, len(pushedLabels)) + for name, value := range pushedLabels { + labels = append(labels, fmt.Sprintf(`%s="%s"`, name, value)) + } + sb.WriteString(strings.Join(labels, ", ")) + sb.WriteString("}") + + level.Debug(logger).Log( + "msg", "OTLP push request stream before service name discovery", + "stream", sb.String(), + "service_name", streamLabels[model.LabelName(LabelServiceName)], + ) + } + if err := streamLabels.Validate(); err != nil { stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err)) continue diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index e2ca137f274c0..5e5632eec0082 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" @@ -508,7 +509,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() tracker := NewMockTracker() - pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats) + pushReq := otlpToLokiPushRequest( + context.Background(), + tc.generateLogs(), + "foo", + fakeRetention{}, + tc.otlpConfig, + defaultServiceDetection, + tracker, + stats, + false, + log.NewNopLogger(), + ) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index e048546fb4083..be1d8b34b9f31 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -84,7 +84,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string { } type ( - RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) + RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) RequestParserWrapper func(inner RequestParser) RequestParser ) @@ -106,8 +106,8 @@ type Stats struct { IsAggregatedMetric bool } -func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) { - req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker) +func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) { + req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger) if err != nil { return nil, err } @@ -164,7 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -247,8 +247,13 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.IsAggregatedMetric = true } + var beforeServiceName string + if logPushRequestStreams { + beforeServiceName = lbs.String() + } + + serviceName := ServiceUnknown if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric { - serviceName := ServiceUnknown for _, labelName := range discoverServiceName { if labelVal := lbs.Get(labelName); labelVal != "" { serviceName = labelVal @@ -264,6 +269,14 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe lbs = lb.Del(LabelServiceName).Labels() } + if logPushRequestStreams { + level.Debug(logger).Log( + "msg", "push request stream before service name discovery", + "labels", beforeServiceName, + "service_name", serviceName, + ) + } + var retentionPeriod time.Duration if tenantsRetention != nil { retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index e63b2c873c8de..b5609dec57a69 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -262,7 +262,16 @@ func TestParseRequest(t *testing.T) { } tracker := NewMockTracker() - data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker) + data, err := ParseRequest( + util_log.Logger, + "fake", + request, + nil, + &fakeLimits{enabled: test.enableServiceDiscovery}, + ParseLokiRequest, + tracker, + false, + ) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived @@ -355,7 +364,7 @@ func Test_ServiceDetection(t *testing.T) { request := createRequest("/loki/api/v1/push", strings.NewReader(body)) limits := &fakeLimits{enabled: true, labels: []string{"foo"}} - data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker, false) require.NoError(t, err) require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) @@ -366,7 +375,7 @@ func Test_ServiceDetection(t *testing.T) { request := createRequest("/otlp/v1/push", bytes.NewReader(body)) limits := &fakeLimits{enabled: true} - data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false) require.NoError(t, err) require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) }) @@ -380,7 +389,7 @@ func Test_ServiceDetection(t *testing.T) { labels: []string{"special"}, indexAttributes: []string{"special"}, } - data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false) require.NoError(t, err) require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels) }) @@ -394,7 +403,7 @@ func Test_ServiceDetection(t *testing.T) { labels: []string{"special"}, indexAttributes: []string{}, } - data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false) require.NoError(t, err) require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels) })