Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add service_name label earlier in the ingestion pipeline #13702

Merged
merged 3 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ const (

ringAutoForgetUnhealthyPeriods = 2

labelServiceName = "service_name"
serviceUnknown = "unknown_service"
levelLabel = "detected_level"
logLevelDebug = "debug"
logLevelInfo = "info"
Expand Down Expand Up @@ -789,20 +787,6 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
return nil, "", 0, err
}

// We do not want to count service_name added by us in the stream limit so adding it after validating original labels.
if !ls.Has(labelServiceName) && len(vContext.discoverServiceName) > 0 {
serviceName := serviceUnknown
for _, labelName := range vContext.discoverServiceName {
if labelVal := ls.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

ls = labels.NewBuilder(ls).Set(labelServiceName, serviceName).Labels()
stream.Labels = ls.String()
}

lsHash := ls.Hash()

d.labelCache.Add(key, labelData{ls, lsHash})
Expand Down
125 changes: 4 additions & 121 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func TestDistributor(t *testing.T) {
t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)
Expand Down Expand Up @@ -140,20 +139,17 @@ func TestDistributor(t *testing.T) {
func Test_IncrementTimestamp(t *testing.T) {
incrementingDisabled := &validation.Limits{}
flagext.DefaultValues(incrementingDisabled)
incrementingDisabled.DiscoverServiceName = nil
incrementingDisabled.RejectOldSamples = false
incrementingDisabled.DiscoverLogLevels = false

incrementingEnabled := &validation.Limits{}
flagext.DefaultValues(incrementingEnabled)
incrementingEnabled.DiscoverServiceName = nil
incrementingEnabled.RejectOldSamples = false
incrementingEnabled.IncrementDuplicateTimestamp = true
incrementingEnabled.DiscoverLogLevels = false

defaultLimits := &validation.Limits{}
flagext.DefaultValues(defaultLimits)
now := time.Now()
defaultLimits.DiscoverLogLevels = false

tests := map[string]struct {
Expand Down Expand Up @@ -401,34 +397,6 @@ func Test_IncrementTimestamp(t *testing.T) {
},
},
},
"default limit adding service_name label": {
limits: defaultLimits,
push: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\"}",
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
expectedPush: &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: "{job=\"foo\", service_name=\"foo\"}",
Hash: 0x86ca305b6d86e8b0,
Entries: []logproto.Entry{
{Timestamp: now.Add(-2 * time.Second), Line: "hey1"},
{Timestamp: now.Add(-time.Second), Line: "hey2"},
{Timestamp: now, Line: "hey3"},
},
},
},
},
},
}

for testName, testData := range tests {
Expand All @@ -448,7 +416,6 @@ func Test_IncrementTimestamp(t *testing.T) {
func TestDistributorPushConcurrently(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil

distributors, ingesters := prepare(t, 1, 5, limits, nil)

Expand Down Expand Up @@ -552,20 +519,6 @@ func Test_SortLabelsOnPush(t *testing.T) {
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels)
})

t.Run("with service_name added during ingestion", func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
ingester := &mockIngester{}
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

request := makeWriteRequest(10, 10)
request.Streams[0].Labels = `{buzz="f", x="y", a="b"}`
_, err := distributors[0].Push(ctx, request)
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels)
})
}

func Test_TruncateLogLines(t *testing.T) {
Expand Down Expand Up @@ -603,7 +556,7 @@ func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) {
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

_, err := distributors[0].Push(ctx, makeWriteRequest(1, 10))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\", service_name=\"unknown_service\"}", 10)))
require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10)))
topVal := ingester.Peek()
require.Nil(t, topVal)
})
Expand Down Expand Up @@ -885,53 +838,9 @@ func TestParseStreamLabels(t *testing.T) {
expectedErr error
generateLimits func() *validation.Limits
}{
{
name: "service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{foo="bar"}`,
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
},
},
{
name: "no labels defined - service name label mapping disabled",
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.DiscoverServiceName = nil
return limits
},
origLabels: `{}`,
expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg),
},
{
name: "service name label enabled",
origLabels: `{foo="bar"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "foo",
Value: "bar",
},
{
Name: labelServiceName,
Value: serviceUnknown,
},
},
},
{
name: "service name label should not get counted against max labels count",
origLabels: `{foo="bar"}`,
origLabels: `{foo="bar", service_name="unknown_service"}`,
generateLimits: func() *validation.Limits {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand All @@ -944,33 +853,8 @@ func TestParseStreamLabels(t *testing.T) {
Value: "bar",
},
{
Name: labelServiceName,
Value: serviceUnknown,
},
},
},
{
name: "use label service as service name",
origLabels: `{container="nginx", foo="bar", service="auth"}`,
generateLimits: func() *validation.Limits {
return defaultLimit
},
expectedLabels: labels.Labels{
{
Name: "container",
Value: "nginx",
},
{
Name: "foo",
Value: "bar",
},
{
Name: "service",
Value: "auth",
},
{
Name: labelServiceName,
Value: "auth",
Name: loghttp_push.LabelServiceName,
Value: loghttp_push.ServiceUnknown,
},
},
},
Expand Down Expand Up @@ -1562,7 +1446,6 @@ func Test_DetectLogLevels(t *testing.T) {
flagext.DefaultValues(limits)

limits.DiscoverLogLevels = discoverLogLevels
limits.DiscoverServiceName = nil
limits.AllowStructuredMetadata = true
return limits, &mockIngester{}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc()
return fmt.Errorf(validation.MissingLabelsErrorMsg)
}

numLabelNames := len(ls)
// This is a special case that's often added by the Loki infrastructure. It may result in allowing one extra label
// if incoming requests already have a service_name
if ls.Has(push.LabelServiceName) {
numLabelNames--
}

if numLabelNames > ctx.maxLabelNamesPerSeries {
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream)
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
Expand Down
47 changes: 38 additions & 9 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
"time"

"github.com/grafana/loki/v3/pkg/logql/syntax"

"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/push"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/unmarshal"
Expand Down Expand Up @@ -57,14 +58,19 @@ var (
linesReceivedStats = analytics.NewCounter("distributor_lines_received")
)

const applicationJSON = "application/json"
const (
applicationJSON = "application/json"
LabelServiceName = "service_name"
ServiceUnknown = "unknown_service"
)

type TenantsRetention interface {
RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
}

type Limits interface {
OTLPConfig(userID string) OTLPConfig
DiscoverServiceName(userID string) []string
}

type EmptyLimits struct{}
Expand All @@ -73,6 +79,10 @@ func (EmptyLimits) OTLPConfig(string) OTLPConfig {
return DefaultOTLPConfig(GlobalOTLPConfig{})
}

func (EmptyLimits) DiscoverServiceName(string) []string {
return nil
}

type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
type RequestParserWrapper func(inner RequestParser) RequestParser

Expand Down Expand Up @@ -148,7 +158,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
return req, nil
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
Expand Down Expand Up @@ -217,16 +227,33 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.ContentType = contentType
pushStats.ContentEncoding = contentEncoding

for _, s := range req.Streams {
discoverServiceName := limits.DiscoverServiceName(userID)
for i := range req.Streams {
s := req.Streams[i]
pushStats.StreamLabelsSize += int64(len(s.Labels))

var lbs labels.Labels
if tenantsRetention != nil || tracker != nil {
lbs, err = syntax.ParseLabels(s.Labels)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse labels: %w", err)
lbs, err := syntax.ParseLabels(s.Labels)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse labels: %w", err)
}

if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 {
serviceName := ServiceUnknown
for _, labelName := range discoverServiceName {
if labelVal := lbs.Get(labelName); labelVal != "" {
serviceName = labelVal
break
}
}

lb := labels.NewBuilder(lbs)
lbs = lb.Set(LabelServiceName, serviceName).Labels()
s.Labels = lbs.String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we also need to set it back req.Streams[i] = s


// Remove the added label after it's added to the stream so it's not consumed by subsequent steps
Copy link
Member

@na-- na-- Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is the right call here 🤔 It's consistent with the current behavior so far, but I think it might be a bit confusing to users. They will suddenly see service_name everywhere, so they might wonder why they couldn't configure their retention periods with it too, right? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this is kind of confusing but I believe the tenant retentions bytes here are used for billing so I'm a bit cautious to change it.

Also we've not made anything worse 😅

lbs = lb.Del(LabelServiceName).Labels()
}

var retentionPeriod time.Duration
if tenantsRetention != nil {
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)
Expand All @@ -249,6 +276,8 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.MostRecentEntryTimestamp = e.Timestamp
}
}

req.Streams[i] = s
}

return &req, pushStats, nil
Expand Down
Loading
Loading