From 26952fb229779f648e26b9b9696fc030d987b1ea Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 29 Aug 2024 10:36:56 -0600 Subject: [PATCH] feat: add _extracted suffix to detected fields conflicts (#13993) --- pkg/querier/querier.go | 15 ++++-- pkg/querier/querier_mock_test.go | 4 +- pkg/querier/querier_test.go | 93 ++++++++++++++++++++++++-------- 3 files changed, 86 insertions(+), 26 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b320e5c5fd6ad..a03182ae2b942 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1199,6 +1199,11 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p emtpyparser := "" for _, stream := range streams { + streamLbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + streamLbls = labels.EmptyLabels() + } + for _, entry := range stream.Entries { structuredMetadata := getStructuredMetadata(entry) for k, vals := range structuredMetadata { @@ -1226,7 +1231,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p } } - detected, parser := parseLine(entry.Line) + detected, parser := parseLine(entry.Line, streamLbls) for k, vals := range detected { df, ok := detectedFields[k] if !ok && fieldCount < limit { @@ -1283,11 +1288,11 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { return result } -func parseLine(line string) (map[string][]string, *string) { +func parseLine(line string, streamLbls labels.Labels) (map[string][]string, *string) { parser := "logfmt" logFmtParser := logql_log.NewLogfmtParser(true, false) - lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0) + lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) if !logfmtSuccess || lbls.HasErr() { parser = "json" @@ -1301,6 +1306,10 @@ func parseLine(line string) (map[string][]string, *string) { parsedLabels := map[string]map[string]struct{}{} for _, lbl := range lbls.LabelsResult().Labels() { + // skip indexed labels, as we only want detected fields + if streamLbls.Has(lbl.Name) { + continue + } if values, ok := parsedLabels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else { diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 4ddbab7ed2e59..c783b1bf11e34 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -575,7 +575,7 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream } func mockLogfmtStream(from int, quantity int) logproto.Stream { - return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`) + return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`) } func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream { @@ -586,7 +586,7 @@ func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Str entries = append(entries, logproto.Entry{ Timestamp: time.Unix(int64(i), 0), Line: fmt.Sprintf( - `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`, + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, i, i, (i * 10), diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 7336c3b11bfaf..701f01bfefd38 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1777,15 +1777,16 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 7) + assert.Len(t, detectedFields, 8) expectedCardinality := map[string]uint64{ - "message": 5, - "count": 5, - "fake": 1, - "bytes": 5, - "duration": 5, - "percent": 5, - "even": 2, + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + "name_extracted": 1, } for _, d := range detectedFields { card := expectedCardinality[d.Label] @@ -1821,17 +1822,18 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 9) + assert.Len(t, detectedFields, 10) expectedCardinality := map[string]uint64{ - "variable": 5, - "constant": 1, - "message": 5, - "count": 5, - "fake": 1, - "bytes": 5, - "duration": 5, - "percent": 5, - "even": 2, + "variable": 5, + "constant": 1, + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + "name_extracted": 1, } for _, d := range detectedFields { card := expectedCardinality[d.Label] @@ -1867,7 +1869,7 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 7) + assert.Len(t, detectedFields, 8) var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField for _, field := range detectedFields { @@ -1923,7 +1925,7 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 9) + assert.Len(t, detectedFields, 10) var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField for _, field := range detectedFields { @@ -1955,7 +1957,56 @@ func TestQuerier_DetectedFields(t *testing.T) { assert.Equal(t, []string{"logfmt"}, evenField.Parsers) assert.Equal(t, []string{""}, constantField.Parsers) assert.Equal(t, []string{""}, variableField.Parsers) - }) + }, + ) + + t.Run( + "adds _extracted suffix to detected fields that conflict with indexed labels", + func(t *testing.T) { + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything). + Return(mockLogfmtStreamIterator(1, 2), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv"). + Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). + Return(queryClient, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + resp, err := querier.DetectedFields(ctx, &request) + require.NoError(t, err) + + detectedFields := resp.Fields + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 10) + + var nameField *logproto.DetectedField + for _, field := range detectedFields { + switch field.Label { + case "name_extracted": + nameField = field + } + } + + assert.NotNil(t, nameField) + assert.Equal(t, "name_extracted", nameField.Label) + assert.Equal(t, logproto.DetectedFieldString, nameField.Type) + assert.Equal(t, []string{"logfmt"}, nameField.Parsers) + assert.Equal(t, uint64(1), nameField.Cardinality) + }, + ) } func BenchmarkQuerierDetectedFields(b *testing.B) {