diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 997a4bf7731ce..b6000b5479ecb 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1071,6 +1071,7 @@ func containsAllIDTypes(values []string) bool { return true } +// TODO(twhitney): Delete this method and the GRPC service signature. This is now handled in the query frontend. func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) { expr, err := syntax.ParseLogSelector(req.Query, true) if err != nil { @@ -1134,33 +1135,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. }, nil } -func getParsersFromExpr(expr syntax.LogSelectorExpr) []string { - parsers := make([]string, 0) - expr.Walk(func(e syntax.Expr) { - switch concrete := e.(type) { - case *syntax.LogfmtParserExpr, *syntax.LogfmtExpressionParser: - if !slices.Contains(parsers, "logfmt") { - parsers = append(parsers, "logfmt") - } - case *syntax.JSONExpressionParser: - if !slices.Contains(parsers, "json") { - parsers = append(parsers, "json") - } - case *syntax.LabelParserExpr: - if concrete.Op == syntax.OpParserTypeJSON { - if !slices.Contains(parsers, "json") { - parsers = append(parsers, "json") - } - } - } - // bail if we found both parsers - if len(parsers) == 2 { - return - } - }) - return parsers -} - type parsedFields struct { sketch *hyperloglog.Sketch fieldType logproto.DetectedFieldType diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index eb8b8c3a97544..9b4928ee34c25 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -16,21 +16,17 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" util_log "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/util/constants" @@ -1736,935 +1732,3 @@ func BenchmarkQuerierDetectedLabels(b *testing.B) { assert.NoError(b, err) } } - -func TestQuerier_DetectedFields(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) - require.NoError(t, err) - ctx := user.InjectOrgID(context.Background(), "test") - - conf := mockQuerierConfig() - conf.IngesterQueryStoreMaxLookback = 0 - - request := logproto.DetectedFieldsRequest{ - Start: time.Now().Add(-1 * time.Minute), - End: time.Now(), - Query: `{type="test"}`, - LineLimit: 1000, - FieldLimit: 1000, - } - - t.Run("returns detected fields from queried logs", func(t *testing.T) { - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything). - Return(mockLogfmtStreamIterator(1, 5), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv"). - Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 5)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). - Return(queryClient, nil) - - querier, err := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - resp, err := querier.DetectedFields(ctx, &request) - require.NoError(t, err) - - detectedFields := resp.Fields - // log lines come from querier_mock_test.go - // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 8) - expectedCardinality := map[string]uint64{ - "message": 5, - "count": 5, - "fake": 1, - "bytes": 5, - "duration": 5, - "percent": 5, - "even": 2, - "name_extracted": 1, - } - for _, d := range detectedFields { - card := expectedCardinality[d.Label] - assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label) - } - }) - - t.Run("returns detected fields with structured metadata from queried logs", func(t *testing.T) { - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything). - Return(mockLogfmtStreamIterator(1, 5), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv"). - Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 5)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). - Return(queryClient, nil) - - querier, err := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - resp, err := querier.DetectedFields(ctx, &request) - require.NoError(t, err) - - detectedFields := resp.Fields - // log lines come from querier_mock_test.go - // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 10) - expectedCardinality := map[string]uint64{ - "variable": 5, - "constant": 1, - "message": 5, - "count": 5, - "fake": 1, - "bytes": 5, - "duration": 5, - "percent": 5, - "even": 2, - "name_extracted": 1, - } - for _, d := range detectedFields { - card := expectedCardinality[d.Label] - assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label) - } - }) - - t.Run("correctly identifies different field types", func(t *testing.T) { - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything). - Return(mockLogfmtStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv"). - Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). - Return(queryClient, nil) - - querier, err := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - resp, err := querier.DetectedFields(ctx, &request) - require.NoError(t, err) - - detectedFields := resp.Fields - // log lines come from querier_mock_test.go - // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 8) - - var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField - for _, field := range detectedFields { - switch field.Label { - case "message": - messageField = field - case "count": - countField = field - case "bytes": - bytesField = field - case "duration": - durationField = field - case "percent": - floatField = field - case "even": - evenField = field - } - } - - assert.Equal(t, logproto.DetectedFieldString, messageField.Type) - assert.Equal(t, logproto.DetectedFieldInt, countField.Type) - assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type) - assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type) - assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type) - assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type) - }) - - t.Run("correctly identifies parser to use with logfmt and structured metadata", func(t *testing.T) { - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything). - Return(mockLogfmtStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv"). - Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). - Return(queryClient, nil) - - querier, err := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - resp, err := querier.DetectedFields(ctx, &request) - require.NoError(t, err) - - detectedFields := resp.Fields - // log lines come from querier_mock_test.go - // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 10) - - var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField - for _, field := range detectedFields { - switch field.Label { - case "message": - messageField = field - case "count": - countField = field - case "bytes": - bytesField = field - case "duration": - durationField = field - case "percent": - floatField = field - case "even": - evenField = field - case "constant": - constantField = field - case "variable": - variableField = field - } - } - - assert.Equal(t, []string{"logfmt"}, messageField.Parsers) - assert.Equal(t, []string{"logfmt"}, countField.Parsers) - assert.Equal(t, []string{"logfmt"}, bytesField.Parsers) - assert.Equal(t, []string{"logfmt"}, durationField.Parsers) - assert.Equal(t, []string{"logfmt"}, floatField.Parsers) - assert.Equal(t, []string{"logfmt"}, evenField.Parsers) - assert.Equal(t, []string(nil), constantField.Parsers) - assert.Equal(t, []string(nil), variableField.Parsers) - }, - ) - - t.Run( - "adds _extracted suffix to detected fields that conflict with indexed labels", - func(t *testing.T) { - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything). - Return(mockLogfmtStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv"). - Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). - Return(queryClient, nil) - - querier, err := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - require.NoError(t, err) - - resp, err := querier.DetectedFields(ctx, &request) - require.NoError(t, err) - - detectedFields := resp.Fields - // log lines come from querier_mock_test.go - // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 10) - - var nameField *logproto.DetectedField - for _, field := range detectedFields { - switch field.Label { - case "name_extracted": - nameField = field - } - } - - assert.NotNil(t, nameField) - assert.Equal(t, "name_extracted", nameField.Label) - assert.Equal(t, logproto.DetectedFieldString, nameField.Type) - assert.Equal(t, []string{"logfmt"}, nameField.Parsers) - assert.Equal(t, uint64(1), nameField.Cardinality) - }, - ) -} - -func BenchmarkQuerierDetectedFields(b *testing.B) { - limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil) - ctx := user.InjectOrgID(context.Background(), "test") - - conf := mockQuerierConfig() - conf.IngesterQueryStoreMaxLookback = 0 - - request := logproto.DetectedFieldsRequest{ - Start: time.Now().Add(-1 * time.Minute), - End: time.Now(), - Query: `{type="test"}`, - LineLimit: 1000, - FieldLimit: 1000, - } - - store := newStoreMock() - store.On("SelectLogs", mock.Anything, mock.Anything). - Return(mockLogfmtStreamIterator(1, 2), nil) - - queryClient := newQueryClientMock() - queryClient.On("Recv"). - Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil) - - ingesterClient := newQuerierClientMock() - ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). - Return(queryClient, nil) - - querier, _ := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - store, limits) - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - _, err := querier.DetectedFields(ctx, &request) - assert.NoError(b, err) - } -} - -func Test_getParsersFromExpr(t *testing.T) { - t.Run("detects logfmt parser", func(t *testing.T) { - exprStr := `{foo="bar"} | logfmt` - expr, err := syntax.ParseLogSelector(exprStr, true) - require.NoError(t, err) - assert.Equal(t, []string{"logfmt"}, getParsersFromExpr(expr)) - }) - - t.Run("detects json parser", func(t *testing.T) { - exprStr := `{foo="bar"} | json` - expr, err := syntax.ParseLogSelector(exprStr, true) - require.NoError(t, err) - assert.Equal(t, []string{"json"}, getParsersFromExpr(expr)) - }) - - t.Run("detects multiple parsers", func(t *testing.T) { - exprStr := `{foo="bar"} | logfmt | json` - expr, err := syntax.ParseLogSelector(exprStr, true) - require.NoError(t, err) - assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr)) - }) - - t.Run("detects logfmt expression parser", func(t *testing.T) { - exprStr := `{foo="bar"} | logfmt msg="message"` - expr, err := syntax.ParseLogSelector(exprStr, true) - require.NoError(t, err) - assert.Equal(t, []string{"logfmt"}, getParsersFromExpr(expr)) - }) - - t.Run("detects json expression parser", func(t *testing.T) { - exprStr := `{foo="bar"} | json first_server="servers[0]"` - expr, err := syntax.ParseLogSelector(exprStr, true) - require.NoError(t, err) - assert.Equal(t, []string{"json"}, getParsersFromExpr(expr)) - }) - - t.Run("detects multiple expression parsers", func(t *testing.T) { - exprStr := `{foo="bar"} | logfmt msg="message" | json first_server="servers[0]"` - expr, err := syntax.ParseLogSelector(exprStr, true) - require.NoError(t, err) - assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr)) - }) -} - -func Test_parseDetectedFeilds(t *testing.T) { - now := time.Now() - - t.Run("when no parsers are supplied", func(t *testing.T) { - infoDetectdFiledMetadata := []push.LabelAdapter{ - { - Name: "detected_level", - Value: "info", - }, - } - - rulerLines := []push.Entry{ - {Timestamp: now, Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, - {Timestamp: now, Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, - {Timestamp: now, Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", StructuredMetadata: infoDetectdFiledMetadata}, - } - - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: rulerLines, - Hash: rulerMetric.Hash(), - } - - debugDetectedFieldMetadata := []push.LabelAdapter{ - { - Name: "detected_level", - Value: "debug", - }, - } - - nginxJSONLines := []push.Entry{ - {Timestamp: now, Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, StructuredMetadata: debugDetectedFieldMetadata}, - {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, StructuredMetadata: debugDetectedFieldMetadata}, - {Timestamp: now, Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, StructuredMetadata: debugDetectedFieldMetadata}, - } - - nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` - nginxMetric, err := parser.ParseMetric(nginxLbls) - require.NoError(t, err) - - nginxStream := push.Stream{ - Labels: nginxLbls, - Entries: nginxJSONLines, - Hash: nginxMetric.Hash(), - } - - t.Run("detect logfmt fields when with no supplied parsers", func(t *testing.T) { - df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) - for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "logfmt", parsers[0]) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("detect json fields when with no supplied parsers", func(t *testing.T) { - df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) - for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "json", parsers[0]) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("detect mixed fields when with no supplied parsers", func(t *testing.T) { - df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) - - for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) - require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected) - } - - for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1, "expected only json parser for %s", expected) - require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) - } - - // multiple parsers for fields that exist in both streams - for _, expected := range []string{"method"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) - require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) - require.Contains(t, parsers, "json", "expected json parser for %s", expected) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: rulerLines, - Hash: rulerMetric.Hash(), - } - - df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) - for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "logfmt", parsers[0]) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: rulerLines, - Hash: rulerMetric.Hash(), - } - - nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` - nginxMetric, err := parser.ParseMetric(nginxLbls) - require.NoError(t, err) - - nginxStream := push.Stream{ - Labels: nginxLbls, - Entries: nginxJSONLines, - Hash: nginxMetric.Hash(), - } - - df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) - for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "logfmt", parsers[0]) - } - - for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1, "expected only json parser for %s", expected) - require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) - } - - // multiple parsers for fields that exist in both streams - for _, expected := range []string{"method"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) - require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) - require.Contains(t, parsers, "json", "expected json parser for %s", expected) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - }) - - t.Run("when parsers are supplied", func(t *testing.T) { - infoDetectdFiledMetadata := []push.LabelAdapter{ - { - Name: "detected_level", - Value: "info", - }, - } - - parsedRulerFields := func(ts, tenant, duration string) []push.LabelAdapter { - return []push.LabelAdapter{ - { - Name: "ts", - Value: ts, - }, - { - Name: "caller", - Value: "grpc_logging.go:66", - }, - { - Name: "tenant", - Value: tenant, - }, - { - Name: "level", - Value: "info", - }, - { - Name: "method", - Value: "/cortex.Ingester/Push", - }, - { - Name: "duration", - Value: duration, - }, - { - Name: "msg", - Value: "gRPC", - }, - } - } - - rulerLines := []push.Entry{ - { - Timestamp: now, - Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", - StructuredMetadata: infoDetectdFiledMetadata, - Parsed: parsedRulerFields("2024-09-05T15:36:38.757788067Z", "2419", "19.098s"), - }, - { - Timestamp: now, - Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", - StructuredMetadata: infoDetectdFiledMetadata, - Parsed: parsedRulerFields("2024-09-05T15:36:38.698375619Z", "29", "5.471s"), - }, - { - Timestamp: now, - Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", - StructuredMetadata: infoDetectdFiledMetadata, - Parsed: parsedRulerFields("2024-09-05T15:36:38.629424175Z", "2919", "29.234s"), - }, - } - - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: rulerLines, - Hash: rulerMetric.Hash(), - } - - debugDetectedFieldMetadata := []push.LabelAdapter{ - { - Name: "detected_level", - Value: "debug", - }, - } - - parsedNginxFields := func(host, userIdentifier, datetime, method, request, protocol, status, bytes, referer string) []push.LabelAdapter { - return []push.LabelAdapter{ - { - Name: "host", - Value: host, - }, - { - Name: "user_identifier", - Value: userIdentifier, - }, - { - Name: "datetime", - Value: datetime, - }, - { - Name: "method", - Value: method, - }, - { - Name: "request", - Value: request, - }, - { - Name: "protocol", - Value: protocol, - }, - { - Name: "status", - Value: status, - }, - { - Name: "bytes", - Value: bytes, - }, - { - Name: "referer", - Value: referer, - }, - } - } - - nginxJSONLines := []push.Entry{ - { - Timestamp: now, - Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, - StructuredMetadata: debugDetectedFieldMetadata, - Parsed: parsedNginxFields("100.117.38.203", "nadre3722", "05/Sep/2024:16:13:56 +0000", "PATCH", "/api/loki/v1/push", "HTTP/2.0", "200", "9664", "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"), - }, - { - Timestamp: now, - Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, - StructuredMetadata: debugDetectedFieldMetadata, - Parsed: parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "DELETE", "/api/mimir/v1/push", "HTTP/1.1", "200", "18688", "https://www.districtiterate.biz/synergistic/next-generation/extend"), - }, - { - Timestamp: now, - Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, - StructuredMetadata: debugDetectedFieldMetadata, - Parsed: parsedNginxFields("66.134.9.30", "-", "05/Sep/2024:16:13:55 +0000", "GET", "/api/loki/v1/label/names", "HTTP/1.1", "200", "9314", "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"), - }, - } - - nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` - nginxMetric, err := parser.ParseMetric(nginxLbls) - require.NoError(t, err) - - nginxStream := push.Stream{ - Labels: nginxLbls, - Entries: nginxJSONLines, - Hash: nginxMetric.Hash(), - } - - t.Run("detect logfmt fields", func(t *testing.T) { - df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) - for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "logfmt", parsers[0]) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("detect json fields", func(t *testing.T) { - df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) - for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "json", parsers[0]) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("detect mixed fields", func(t *testing.T) { - df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) - - for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) - require.Equal(t, "logfmt", parsers[0], "expected only logfmt parser for %s", expected) - } - - for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1, "expected only json parser for %s", expected) - require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) - } - - // multiple parsers for fields that exist in both streams - for _, expected := range []string{"method"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) - require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) - require.Contains(t, parsers, "json", "expected json parser for %s", expected) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: rulerLines, - Hash: rulerMetric.Hash(), - } - - df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) - for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "logfmt", parsers[0]) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - - t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: rulerLines, - Hash: rulerMetric.Hash(), - } - - nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` - nginxMetric, err := parser.ParseMetric(nginxLbls) - require.NoError(t, err) - - nginxStream := push.Stream{ - Labels: nginxLbls, - Entries: nginxJSONLines, - Hash: nginxMetric.Hash(), - } - - df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, nginxStream})) - for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1) - require.Equal(t, "logfmt", parsers[0]) - } - - for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 1, "expected only json parser for %s", expected) - require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) - } - - // multiple parsers for fields that exist in both streams - for _, expected := range []string{"method"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) - require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) - require.Contains(t, parsers, "json", "expected json parser for %s", expected) - } - - // no parsers for structed metadata - for _, expected := range []string{"detected_level"} { - require.Contains(t, df, expected) - parsers := df[expected].parsers - - require.Len(t, parsers, 0) - } - }) - }) - - t.Run("handles level in all the places", func(t *testing.T) { - rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house", level="debug"}` - rulerMetric, err := parser.ParseMetric(rulerLbls) - require.NoError(t, err) - - rulerStream := push.Stream{ - Labels: rulerLbls, - Entries: []push.Entry{ - { - Timestamp: now, - Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", - StructuredMetadata: []push.LabelAdapter{ - { - Name: "detected_level", - Value: "debug", - }, - }, - Parsed: []push.LabelAdapter{ - { - Name: "level", - Value: "info", - }, - }, - }, - }, - Hash: rulerMetric.Hash(), - } - - df := parseDetectedFields(uint32(20), logqlmodel.Streams([]push.Stream{rulerStream, rulerStream})) - - detectedLevelField := df["detected_level"] - require.Len(t, detectedLevelField.parsers, 0) - require.Equal(t, uint64(1), detectedLevelField.sketch.Estimate()) - - levelField := df["level_extracted"] - require.Len(t, levelField.parsers, 1) - require.Contains(t, levelField.parsers, "logfmt") - require.Equal(t, uint64(1), levelField.sketch.Estimate()) - }) -} diff --git a/pkg/querier/queryrange/detected_fields.go b/pkg/querier/queryrange/detected_fields.go new file mode 100644 index 0000000000000..9c1ecd0c8a8af --- /dev/null +++ b/pkg/querier/queryrange/detected_fields.go @@ -0,0 +1,383 @@ +package queryrange + +import ( + "context" + "net/http" + "slices" + "strconv" + "time" + + "github.com/axiomhq/hyperloglog" + "github.com/dustin/go-humanize" + "github.com/grafana/dskit/httpgrpc" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/logproto" + logql_log "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel" + "github.com/grafana/loki/v3/pkg/querier/plan" + base "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/v3/pkg/util/httpreq" + + "github.com/grafana/loki/pkg/push" +) + +func NewDetectedFieldsHandler( + limitedHandler base.Handler, + logHandler base.Handler, + limits Limits, +) base.Middleware { + return base.MiddlewareFunc(func(next base.Handler) base.Handler { + return base.HandlerFunc( + func(ctx context.Context, req base.Request) (base.Response, error) { + r, ok := req.(*DetectedFieldsRequest) + if !ok { + return nil, httpgrpc.Errorf( + http.StatusBadRequest, + "invalid request type, expected *DetectedFieldsRequest", + ) + } + + resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r) + if err != nil { + return nil, err + } + + re, ok := resp.(*LokiResponse) + if !ok || re.Status != "success" { + return resp, nil + } + + detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result) + fields := make([]*logproto.DetectedField, len(detectedFields)) + fieldCount := 0 + for k, v := range detectedFields { + p := v.parsers + if len(p) == 0 { + p = nil + } + fields[fieldCount] = &logproto.DetectedField{ + Label: k, + Type: v.fieldType, + Cardinality: v.Estimate(), + Parsers: p, + } + + fieldCount++ + } + + return &DetectedFieldsResponse{ + Response: &logproto.DetectedFieldsResponse{ + Fields: fields, + FieldLimit: r.GetFieldLimit(), + }, + Headers: re.Headers, + }, nil + }) + }) +} + +func makeDownstreamRequest( + ctx context.Context, + limits Limits, + limitedHandler, logHandler base.Handler, + req *DetectedFieldsRequest, +) (base.Response, error) { + expr, err := syntax.ParseLogSelector(req.Query, true) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + if err := validateMaxEntriesLimits(ctx, req.LineLimit, limits); err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + if err := validateMatchers(ctx, limits, expr.Matchers()); err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + lokiReq := &LokiRequest{ + Query: req.GetQuery(), + Step: req.GetStep(), + StartTs: req.GetStartTs(), + EndTs: req.GetEndTs(), + Direction: logproto.BACKWARD, + Limit: req.GetLineLimit(), + Path: "/loki/api/v1/query_range", + } + + lokiReq.Plan = &plan.QueryPlan{ + AST: expr, + } + + // Note(twhitney): The logic for parsing detected fields relies on the Entry.Parsed field being populated. + // The behavior of populating Entry.Parsed is different in ingesters and stores. + // We need to set this header to make sure Entry.Parsed is populated when getting logs from the store. + // Entries from the head block in the ingester always have the Parsed field populated. + ctx = httpreq.InjectHeader( + ctx, + httpreq.LokiEncodingFlagsHeader, + (string)(httpreq.FlagCategorizeLabels), + ) + if expr.HasFilter() { + return logHandler.Do(ctx, lokiReq) + } + return limitedHandler.Do(ctx, lokiReq) +} + +type parsedFields struct { + sketch *hyperloglog.Sketch + fieldType logproto.DetectedFieldType + parsers []string +} + +func newParsedFields(parsers []string) *parsedFields { + return &parsedFields{ + sketch: hyperloglog.New(), + fieldType: logproto.DetectedFieldString, + parsers: parsers, + } +} + +func newParsedLabels() *parsedFields { + return &parsedFields{ + sketch: hyperloglog.New(), + fieldType: logproto.DetectedFieldString, + } +} + +func (p *parsedFields) Insert(value string) { + p.sketch.Insert([]byte(value)) +} + +func (p *parsedFields) Estimate() uint64 { + return p.sketch.Estimate() +} + +func (p *parsedFields) Marshal() ([]byte, error) { + return p.sketch.MarshalBinary() +} + +func (p *parsedFields) DetermineType(value string) { + p.fieldType = determineType(value) +} + +func determineType(value string) logproto.DetectedFieldType { + if _, err := strconv.ParseInt(value, 10, 64); err == nil { + return logproto.DetectedFieldInt + } + + if _, err := strconv.ParseFloat(value, 64); err == nil { + return logproto.DetectedFieldFloat + } + + if _, err := strconv.ParseBool(value); err == nil { + return logproto.DetectedFieldBoolean + } + + if _, err := time.ParseDuration(value); err == nil { + return logproto.DetectedFieldDuration + } + + if _, err := humanize.ParseBytes(value); err == nil { + return logproto.DetectedFieldBytes + } + + return logproto.DetectedFieldString +} + +func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields { + detectedFields := make(map[string]*parsedFields, limit) + fieldCount := uint32(0) + emtpyparsers := []string{} + + for _, stream := range streams { + streamLbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + streamLbls = labels.EmptyLabels() + } + + for _, entry := range stream.Entries { + structuredMetadata := getStructuredMetadata(entry) + for k, vals := range structuredMetadata { + df, ok := detectedFields[k] + if !ok && fieldCount < limit { + df = newParsedFields(emtpyparsers) + detectedFields[k] = df + fieldCount++ + } + + if df == nil { + continue + } + + detectType := true + for _, v := range vals { + parsedFields := detectedFields[k] + if detectType { + // we don't want to determine the type for every line, so we assume the type in each stream will be the same, and re-detect the type for the next stream + parsedFields.DetermineType(v) + detectType = false + } + + parsedFields.Insert(v) + } + } + + entryLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, streamLbls.Hash()) + parsedLabels, parsers := parseEntry(entry, entryLbls) + for k, vals := range parsedLabels { + df, ok := detectedFields[k] + if !ok && fieldCount < limit { + df = newParsedFields(parsers) + detectedFields[k] = df + fieldCount++ + } + + if df == nil { + continue + } + + for _, parser := range parsers { + if !slices.Contains(df.parsers, parser) { + df.parsers = append(df.parsers, parser) + } + } + + detectType := true + for _, v := range vals { + parsedFields := detectedFields[k] + if detectType { + // we don't want to determine the type for every line, so we assume the type in each stream will be the same, and re-detect the type for the next stream + parsedFields.DetermineType(v) + detectType = false + } + + parsedFields.Insert(v) + } + } + } + } + + return detectedFields +} + +func getStructuredMetadata(entry push.Entry) map[string][]string { + labels := map[string]map[string]struct{}{} + for _, lbl := range entry.StructuredMetadata { + if values, ok := labels[lbl.Name]; ok { + values[lbl.Value] = struct{}{} + } else { + labels[lbl.Name] = map[string]struct{}{lbl.Value: {}} + } + } + + result := make(map[string][]string, len(labels)) + for lbl, values := range labels { + vals := make([]string, 0, len(values)) + for v := range values { + vals = append(vals, v) + } + result[lbl] = vals + } + + return result +} + +func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) { + origParsed := getParsedLabels(entry) + + // if the original query has any parser expressions, then we need to differentiate the + // original stream labels from any parsed labels + for name := range origParsed { + lbls.Del(name) + } + streamLbls := lbls.LabelsResult().Stream() + lblBuilder := lbls.ForLabels(streamLbls, streamLbls.Hash()) + + parsed := make(map[string][]string, len(origParsed)) + for lbl, values := range origParsed { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || + lbl == logqlmodel.PreserveErrorLabel { + continue + } + + parsed[lbl] = values + } + + line := entry.Line + parser := "json" + jsonParser := logql_log.NewJSONParser() + _, jsonSuccess := jsonParser.Process(0, []byte(line), lblBuilder) + if !jsonSuccess || lblBuilder.HasErr() { + lblBuilder.Reset() + + logFmtParser := logql_log.NewLogfmtParser(false, false) + parser = "logfmt" + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if !logfmtSuccess || lblBuilder.HasErr() { + return parsed, nil + } + } + + parsedLabels := map[string]map[string]struct{}{} + for lbl, values := range parsed { + if vals, ok := parsedLabels[lbl]; ok { + for _, value := range values { + vals[value] = struct{}{} + } + } else { + parsedLabels[lbl] = map[string]struct{}{} + for _, value := range values { + parsedLabels[lbl][value] = struct{}{} + } + } + } + + lblsResult := lblBuilder.LabelsResult().Parsed() + for _, lbl := range lblsResult { + if values, ok := parsedLabels[lbl.Name]; ok { + values[lbl.Value] = struct{}{} + } else { + parsedLabels[lbl.Name] = map[string]struct{}{lbl.Value: {}} + } + } + + result := make(map[string][]string, len(parsedLabels)) + for lbl, values := range parsedLabels { + if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel || + lbl == logqlmodel.PreserveErrorLabel { + continue + } + vals := make([]string, 0, len(values)) + for v := range values { + vals = append(vals, v) + } + result[lbl] = vals + } + + return result, []string{parser} +} + +func getParsedLabels(entry push.Entry) map[string][]string { + labels := map[string]map[string]struct{}{} + for _, lbl := range entry.Parsed { + if values, ok := labels[lbl.Name]; ok { + values[lbl.Value] = struct{}{} + } else { + labels[lbl.Name] = map[string]struct{}{lbl.Value: {}} + } + } + + result := make(map[string][]string, len(labels)) + for lbl, values := range labels { + vals := make([]string, 0, len(values)) + for v := range values { + vals = append(vals, v) + } + result[lbl] = vals + } + + return result +} diff --git a/pkg/querier/queryrange/detected_fields_test.go b/pkg/querier/queryrange/detected_fields_test.go new file mode 100644 index 0000000000000..1a967f77c069d --- /dev/null +++ b/pkg/querier/queryrange/detected_fields_test.go @@ -0,0 +1,1260 @@ +package queryrange + +import ( + "context" + "fmt" + "math" + "testing" + "time" + + "github.com/grafana/dskit/user" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/loghttp" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + logql_log "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel" + base "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" + + "github.com/grafana/loki/pkg/push" +) + +func Test_parseDetectedFeilds(t *testing.T) { + now := time.Now() + + t.Run("when no parsers are supplied", func(t *testing.T) { + infoDetectdFiledMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "info", + }, + } + + rulerLines := []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + }, + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.698375619Z caller=grpc_logging.go:66 tenant=29 level=info method=/cortex.Ingester/Push duration=5.471s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + }, + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.629424175Z caller=grpc_logging.go:66 tenant=2919 level=info method=/cortex.Ingester/Push duration=29.234s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + }, + } + + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + debugDetectedFieldMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + } + + nginxJSONLines := []push.Entry{ + { + Timestamp: now, + Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, + StructuredMetadata: debugDetectedFieldMetadata, + }, + { + Timestamp: now, + Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "DELETE", "request": "/api/mimir/v1/push", "protocol":"HTTP/1.1", "status":200, "bytes":18688, "referer": "https://www.districtiterate.biz/synergistic/next-generation/extend"}`, + StructuredMetadata: debugDetectedFieldMetadata, + }, + { + Timestamp: now, + Line: `{"host":"66.134.9.30", "user-identifier":"-", "datetime":"05/Sep/2024:16:13:55 +0000", "method": "GET", "request": "/api/loki/v1/label/names", "protocol":"HTTP/1.1", "status":200, "bytes":9314, "referer": "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic"}`, + StructuredMetadata: debugDetectedFieldMetadata, + }, + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json" }` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJSONLines, + Hash: nginxMetric.Hash(), + } + + t.Run("detects logfmt fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detects json fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{nginxStream})) + for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "json", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detects mixed fields", func(t *testing.T) { + df := parseDetectedFields( + uint32(20), + logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}), + ) + + for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) + require.Equal( + t, + "logfmt", + parsers[0], + "expected only logfmt parser for %s", + expected, + ) + } + + for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: rulerLines, + Hash: rulerMetric.Hash(), + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: nginxJSONLines, + Hash: nginxMetric.Hash(), + } + + df := parseDetectedFields( + uint32(20), + logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}), + ) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + }) + + t.Run("when parsers are supplied", func(t *testing.T) { + infoDetectdFiledMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "info", + }, + } + + parsedRulerFields := func(ts, tenant, duration string) []push.LabelAdapter { + return []push.LabelAdapter{ + { + Name: "ts", + Value: ts, + }, + { + Name: "caller", + Value: "grpc_logging.go:66", + }, + { + Name: "tenant", + Value: tenant, + }, + { + Name: "level", + Value: "info", + }, + { + Name: "method", + Value: "/cortex.Ingester/Push", + }, + { + Name: "duration", + Value: duration, + }, + { + Name: "msg", + Value: "gRPC", + }, + } + } + + rulerLbls := labels.FromStrings( + "cluster", "us-east-1", + "namespace", "mimir-dev", + "pod", "mimir-ruler-nfb37", + "service_name", "mimir-ruler", + ) + + rulerStreams := []push.Stream{} + streamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(rulerLbls, rulerLbls.Hash()) + + for _, rulerFields := range [][]push.LabelAdapter{ + parsedRulerFields( + "2024-09-05T15:36:38.757788067Z", + "2419", + "19.098s", + ), + parsedRulerFields( + "2024-09-05T15:36:38.698375619Z", + "29", + "5.471s", + ), + parsedRulerFields( + "2024-09-05T15:36:38.629424175Z", + "2919", + "29.234s", + ), + } { + streamLbls.Reset() + + var ts, tenant, duration push.LabelAdapter + for _, field := range rulerFields { + switch field.Name { + case "ts": + ts = field + case "tenant": + tenant = field + case "duration": + duration = field + } + + streamLbls.Add(log.ParsedLabel, labels.Label{Name: field.Name, Value: field.Value}) + } + + rulerStreams = append(rulerStreams, push.Stream{ + Labels: streamLbls.LabelsResult().String(), + Entries: []push.Entry{ + { + Timestamp: now, + Line: fmt.Sprintf( + "ts=%s caller=grpc_logging.go:66 tenant=%s level=info method=/cortex.Ingester/Push duration=%s msg=gRPC", + ts.Value, + tenant.Value, + duration.Value, + ), + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: rulerFields, + }, + }, + }) + } + + debugDetectedFieldMetadata := []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + } + + parsedNginxFields := func(host, userIdentifier, datetime, method, request, protocol, status, bytes, referer string) []push.LabelAdapter { + return []push.LabelAdapter{ + { + Name: "host", + Value: host, + }, + { + Name: "user_identifier", + Value: userIdentifier, + }, + { + Name: "datetime", + Value: datetime, + }, + { + Name: "method", + Value: method, + }, + { + Name: "request", + Value: request, + }, + { + Name: "protocol", + Value: protocol, + }, + { + Name: "status", + Value: status, + }, + { + Name: "bytes", + Value: bytes, + }, + { + Name: "referer", + Value: referer, + }, + } + } + + nginxLbls := labels.FromStrings( + "cluster", "eu-west-1", + "level", "debug", + "namespace", "gateway", + "pod", "nginx-json-oghco", + "service_name", "nginx-json", + ) + + nginxStreams := []push.Stream{} + nginxStreamLbls := logql_log.NewBaseLabelsBuilder().ForLabels(nginxLbls, nginxLbls.Hash()) + + for _, nginxFields := range [][]push.LabelAdapter{ + parsedNginxFields( + "100.117.38.203", + "nadre3722", + "05/Sep/2024:16:13:56 +0000", + "PATCH", + "/api/loki/v1/push", + "HTTP/2.0", + "200", + "9664", + "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard", + ), + parsedNginxFields( + "66.134.9.30", + "-", + "05/Sep/2024:16:13:55 +0000", + "DELETE", + "/api/mimir/v1/push", + "HTTP/1.1", + "200", + "18688", + "https://www.districtiterate.biz/synergistic/next-generation/extend", + ), + parsedNginxFields( + "66.134.9.30", + "-", + "05/Sep/2024:16:13:55 +0000", + "GET", + "/api/loki/v1/label/names", + "HTTP/1.1", + "200", + "9314", + "https://www.dynamicimplement.info/enterprise/distributed/incentivize/strategic", + ), + } { + nginxStreamLbls.Reset() + + var host, userIdentifier, datetime, method, request, protocol, status, bytes, referer push.LabelAdapter + for _, field := range nginxFields { + switch field.Name { + case "host": + host = field + case "user_identifier": + userIdentifier = field + case "datetime": + datetime = field + case "method": + method = field + case "request": + request = field + case "protocol": + protocol = field + case "status": + status = field + case "bytes": + bytes = field + case "referer": + referer = field + } + + nginxStreamLbls.Add( + log.ParsedLabel, + labels.Label{Name: field.Name, Value: field.Value}, + ) + } + + nginxStreams = append(nginxStreams, push.Stream{ + Labels: nginxStreamLbls.LabelsResult().String(), + Entries: []push.Entry{ + { + Timestamp: now, + Line: fmt.Sprintf( + `{"host":"%s", "user-identifier":"%s", "datetime":"%s", "method": "%s", "request": "%s", "protocol":"%s", "status":%s, "bytes":%s, "referer": ":%s"}`, + host.Value, + userIdentifier.Value, + datetime.Value, + method.Value, + request.Value, + protocol.Value, + status.Value, + bytes.Value, + referer.Value, + ), + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: nginxFields, + }, + }, + }) + } + + t.Run("detect logfmt fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams(rulerStreams)) + for _, expected := range []string{"ts", "caller", "tenant", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect json fields", func(t *testing.T) { + df := parseDetectedFields(uint32(15), logqlmodel.Streams(nginxStreams)) + for _, expected := range []string{"host", "user_identifier", "datetime", "method", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "json", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("detect mixed fields", func(t *testing.T) { + streams := logqlmodel.Streams(rulerStreams) + streams = append(streams, nginxStreams...) + df := parseDetectedFields( + uint32(20), + streams, + ) + + for _, expected := range []string{"ts", "caller", "tenant", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only logfmt parser for %s", expected) + require.Equal( + t, + "logfmt", + parsers[0], + "expected only logfmt parser for %s", + expected, + ) + } + + for _, expected := range []string{"host", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for a single stream", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: []push.LabelAdapter{ + { + Name: "ts", + Value: "2024-09-05T15:36:38.757788067Z", + }, + { + Name: "caller_extracted", + Value: "grpc_logging.go:66", + }, + { + Name: "tenant_extracted", + Value: "2419", + }, + { + Name: "level", + Value: "info", + }, + { + Name: "method", + Value: "/cortex.Ingester/Push", + }, + { + Name: "duration", + Value: "19.098s", + }, + { + Name: "msg", + Value: "gRPC", + }, + }, + }, + }, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields(uint32(15), logqlmodel.Streams([]push.Stream{rulerStream})) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "method", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + + t.Run("correctly applies _extracted for multiple streams", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: infoDetectdFiledMetadata, + Parsed: []push.LabelAdapter{ + { + Name: "ts", + Value: "2024-09-05T15:36:38.757788067Z", + }, + { + Name: "caller_extracted", + Value: "grpc_logging.go:66", + }, + { + Name: "tenant_extracted", + Value: "2419", + }, + { + Name: "level", + Value: "info", + }, + { + Name: "method", + Value: "/cortex.Ingester/Push", + }, + { + Name: "duration", + Value: "19.098s", + }, + { + Name: "msg", + Value: "gRPC", + }, + }, + }, + }, + Hash: rulerMetric.Hash(), + } + + nginxLbls := `{ cluster="eu-west-1", level="debug", namespace="gateway", pod="nginx-json-oghco", service_name="nginx-json", host="localhost"}` + nginxMetric, err := parser.ParseMetric(nginxLbls) + require.NoError(t, err) + + nginxStream := push.Stream{ + Labels: nginxLbls, + Entries: []push.Entry{ + { + Timestamp: now, + Line: `{"host":"100.117.38.203", "user-identifier":"nader3722", "datetime":"05/Sep/2024:16:13:56 +0000", "method": "PATCH", "request": "/api/loki/v1/push", "protocol":"HTTP/2.0", "status":200, "bytes":9664, "referer": "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard"}`, + StructuredMetadata: debugDetectedFieldMetadata, + Parsed: []push.LabelAdapter{ + { + Name: "host_extracted", + Value: "100.117.38.203", + }, + { + Name: "user_identifier", + Value: "nader3722", + }, + { + Name: "datetime", + Value: "05/Sep/2024:16:13:56 +0000", + }, + { + Name: "method", + Value: "PATCH", + }, + { + Name: "request", + Value: "/api/loki/v1/push", + }, + { + Name: "protocol", + Value: "HTTP/2.0", + }, + { + Name: "status", + Value: "200", + }, + { + Name: "bytes", + Value: "9664", + }, + { + Name: "referer", + Value: "https://www.seniorbleeding-edge.net/exploit/robust/whiteboard", + }, + }, + }, + }, + Hash: nginxMetric.Hash(), + } + + df := parseDetectedFields( + uint32(20), + logqlmodel.Streams([]push.Stream{rulerStream, nginxStream}), + ) + for _, expected := range []string{"ts", "caller_extracted", "tenant_extracted", "level", "duration", "msg"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1) + require.Equal(t, "logfmt", parsers[0]) + } + + for _, expected := range []string{"host_extracted", "user_identifier", "datetime", "request", "protocol", "status", "bytes", "referer"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 1, "expected only json parser for %s", expected) + require.Equal(t, "json", parsers[0], "expected only json parser for %s", expected) + } + + // multiple parsers for fields that exist in both streams + for _, expected := range []string{"method"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 2, "expected logfmt and json parser for %s", expected) + require.Contains(t, parsers, "logfmt", "expected logfmt parser for %s", expected) + require.Contains(t, parsers, "json", "expected json parser for %s", expected) + } + + // no parsers for structed metadata + for _, expected := range []string{"detected_level"} { + require.Contains(t, df, expected) + parsers := df[expected].parsers + + require.Len(t, parsers, 0) + } + }) + }) + + t.Run("handles level in all the places", func(t *testing.T) { + rulerLbls := `{cluster="us-east-1", namespace="mimir-dev", pod="mimir-ruler-nfb37", service_name="mimir-ruler", tenant="42", caller="inside-the-house", level="debug"}` + rulerMetric, err := parser.ParseMetric(rulerLbls) + require.NoError(t, err) + + rulerStream := push.Stream{ + Labels: rulerLbls, + Entries: []push.Entry{ + { + Timestamp: now, + Line: "ts=2024-09-05T15:36:38.757788067Z caller=grpc_logging.go:66 tenant=2419 level=info method=/cortex.Ingester/Push duration=19.098s msg=gRPC", + StructuredMetadata: []push.LabelAdapter{ + { + Name: "detected_level", + Value: "debug", + }, + }, + Parsed: []push.LabelAdapter{ + { + Name: "level_extracted", + Value: "info", + }, + }, + }, + }, + Hash: rulerMetric.Hash(), + } + + df := parseDetectedFields( + uint32(20), + logqlmodel.Streams([]push.Stream{rulerStream, rulerStream}), + ) + + detectedLevelField := df["detected_level"] + require.Len(t, detectedLevelField.parsers, 0) + require.Equal(t, uint64(1), detectedLevelField.sketch.Estimate()) + + levelField := df["level_extracted"] + require.Len(t, levelField.parsers, 1) + require.Contains(t, levelField.parsers, "logfmt") + require.Equal(t, uint64(1), levelField.sketch.Estimate()) + }) +} + +func mockLogfmtStreamWithLabels(_ int, quantity int, lbls string) logproto.Stream { + entries := make([]logproto.Entry, 0, quantity) + streamLabels, err := syntax.ParseLabels(lbls) + if err != nil { + streamLabels = labels.EmptyLabels() + } + + lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := logql_log.NewLogfmtParser(false, false) + + // used for detected fields queries which are always BACKWARD + for i := quantity; i > 0; i-- { + line := fmt.Sprintf( + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, + i, + i, + (i * 10), + (i * 256), + float32(i*10.0), + (i%2 == 0), + ) + + entry := logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: line, + } + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if logfmtSuccess { + entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed()) + } + entries = append(entries, entry) + } + + return logproto.Stream{ + Entries: entries, + Labels: lbls, + } +} + +func mockLogfmtStreamWithLabelsAndStructuredMetadata( + from int, + quantity int, + lbls string, +) logproto.Stream { + var entries []logproto.Entry + metadata := push.LabelsAdapter{ + { + Name: "constant", + Value: "constant", + }, + } + + for i := from; i < from+quantity; i++ { + metadata = append(metadata, push.LabelAdapter{ + Name: "variable", + Value: fmt.Sprintf("value%d", i), + }) + } + + streamLabels, err := syntax.ParseLabels(lbls) + if err != nil { + streamLabels = labels.EmptyLabels() + } + + lblBuilder := logql_log.NewBaseLabelsBuilder().ForLabels(streamLabels, streamLabels.Hash()) + logFmtParser := logql_log.NewLogfmtParser(false, false) + + // used for detected fields queries which are always BACKWARD + for i := quantity; i > 0; i-- { + line := fmt.Sprintf( + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, + i, + i, + (i * 10), + (i * 256), + float32(i*10.0), + (i%2 == 0), + ) + + entry := logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: line, + StructuredMetadata: metadata, + } + _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder) + if logfmtSuccess { + entry.Parsed = logproto.FromLabelsToLabelAdapters(lblBuilder.LabelsResult().Parsed()) + } + entries = append(entries, entry) + } + + return logproto.Stream{ + Labels: lbls, + Entries: entries, + } +} + +func TestQuerier_DetectedFields(t *testing.T) { + limits := fakeLimits{ + maxSeries: math.MaxInt32, + maxQueryParallelism: 1, + tsdbMaxQueryParallelism: 1, + maxQueryBytesRead: 1000, + maxQuerierBytesRead: 100, + } + + limitedHandler := func(stream logproto.Stream) base.Handler { + return base.HandlerFunc( + func(ctx context.Context, req base.Request) (base.Response, error) { + return &LokiResponse{ + Status: "success", + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + stream, + }, + }, + Direction: logproto.BACKWARD, + }, nil + }) + } + + logHandler := func(stream logproto.Stream) base.Handler { + return base.HandlerFunc( + func(ctx context.Context, req base.Request) (base.Response, error) { + return &LokiResponse{ + Status: "success", + Data: LokiData{ + ResultType: loghttp.ResultTypeStream, + Result: []logproto.Stream{ + stream, + }, + }, + Direction: logproto.BACKWARD, + }, nil + }) + } + + request := DetectedFieldsRequest{ + logproto.DetectedFieldsRequest{ + Start: time.Now().Add(-1 * time.Minute), + End: time.Now(), + Query: `{type="test"} | logfmt | json`, + LineLimit: 1000, + FieldLimit: 1000, + }, + "/loki/api/v1/detected_fields", + } + + handleRequest := func(handler base.Handler, request DetectedFieldsRequest) []*logproto.DetectedField { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test-tenant") + + resp, err := handler.Do(ctx, &request) + require.NoError(t, err) + + r, ok := resp.(*DetectedFieldsResponse) + require.True(t, ok) + + return r.Response.Fields + } + + t.Run("returns detected fields from queried logs", func(t *testing.T) { + handler := NewDetectedFieldsHandler( + limitedHandler(mockLogfmtStreamWithLabels(1, 5, `{type="test", name="foo"}`)), + logHandler(mockLogfmtStreamWithLabels(1, 5, `{type="test", name="foo"}`)), + limits, + ).Wrap(base.HandlerFunc(func(ctx context.Context, req base.Request) (base.Response, error) { + t.Fatal("should not be called") + return nil, nil + })) + + detectedFields := handleRequest(handler, request) + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 8) + expectedCardinality := map[string]uint64{ + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + "name_extracted": 1, + } + for _, d := range detectedFields { + card := expectedCardinality[d.Label] + assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label) + } + }) + + t.Run("returns detected fields with structured metadata from queried logs", func(t *testing.T) { + handler := NewDetectedFieldsHandler( + limitedHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 5, `{type="test", name="bob"}`)), + logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 5, `{type="test", name="bob"}`)), + limits, + ).Wrap(base.HandlerFunc(func(ctx context.Context, req base.Request) (base.Response, error) { + t.Fatal("should not be called") + return nil, nil + })) + + detectedFields := handleRequest(handler, request) + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 10) + expectedCardinality := map[string]uint64{ + "variable": 5, + "constant": 1, + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + "name_extracted": 1, + } + for _, d := range detectedFields { + card := expectedCardinality[d.Label] + assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label) + } + }) + + t.Run("correctly identifies different field types", func(t *testing.T) { + handler := NewDetectedFieldsHandler( + limitedHandler(mockLogfmtStreamWithLabels(1, 2, `{type="test", name="foo"}`)), + logHandler(mockLogfmtStreamWithLabels(1, 2, `{type="test", name="foo"}`)), + limits, + ).Wrap(base.HandlerFunc(func(ctx context.Context, req base.Request) (base.Response, error) { + t.Fatal("should not be called") + return nil, nil + })) + + detectedFields := handleRequest(handler, request) + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 8) + + var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField + for _, field := range detectedFields { + print(field.Label) + switch field.Label { + case "message": + messageField = field + case "count": + countField = field + case "bytes": + bytesField = field + case "duration": + durationField = field + case "percent": + floatField = field + case "even": + evenField = field + } + } + + assert.Equal(t, logproto.DetectedFieldString, messageField.Type) + assert.Equal(t, logproto.DetectedFieldInt, countField.Type) + assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type) + assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type) + assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type) + assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type) + }) + + t.Run( + "correctly identifies parser to use with logfmt and structured metadata", + func(t *testing.T) { + handler := NewDetectedFieldsHandler( + limitedHandler( + mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test"}`), + ), + logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test"}`)), + limits, + ).Wrap(base.HandlerFunc(func(ctx context.Context, req base.Request) (base.Response, error) { + t.Fatal("should not be called") + return nil, nil + })) + + detectedFields := handleRequest(handler, request) + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 10) + + var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField + for _, field := range detectedFields { + switch field.Label { + case "message": + messageField = field + case "count": + countField = field + case "bytes": + bytesField = field + case "duration": + durationField = field + case "percent": + floatField = field + case "even": + evenField = field + case "constant": + constantField = field + case "variable": + variableField = field + } + } + + assert.Equal(t, []string{"logfmt"}, messageField.Parsers) + assert.Equal(t, []string{"logfmt"}, countField.Parsers) + assert.Equal(t, []string{"logfmt"}, bytesField.Parsers) + assert.Equal(t, []string{"logfmt"}, durationField.Parsers) + assert.Equal(t, []string{"logfmt"}, floatField.Parsers) + assert.Equal(t, []string{"logfmt"}, evenField.Parsers) + assert.Equal(t, []string(nil), constantField.Parsers) + assert.Equal(t, []string(nil), variableField.Parsers) + }, + ) + + t.Run( + "adds _extracted suffix to detected fields that conflict with indexed labels", + func(t *testing.T) { + handler := NewDetectedFieldsHandler( + limitedHandler( + mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test", name="bob"}`), + ), + logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test", name="bob"}`)), + limits, + ).Wrap(base.HandlerFunc(func(ctx context.Context, req base.Request) (base.Response, error) { + t.Fatal("should not be called") + return nil, nil + })) + + detectedFields := handleRequest(handler, request) + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 10) + + var nameField *logproto.DetectedField + for _, field := range detectedFields { + switch field.Label { + case "name_extracted": + nameField = field + } + } + + assert.NotNil(t, nameField) + assert.Equal(t, "name_extracted", nameField.Label) + assert.Equal(t, logproto.DetectedFieldString, nameField.Type) + assert.Equal(t, []string{"logfmt"}, nameField.Parsers) + assert.Equal(t, uint64(1), nameField.Cardinality) + }, + ) +} + +// func BenchmarkQuerierDetectedFields(b *testing.B) { +// limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil) +// ctx := user.InjectOrgID(context.Background(), "test") + +// conf := mockQuerierConfig() +// conf.IngesterQueryStoreMaxLookback = 0 + +// request := logproto.DetectedFieldsRequest{ +// Start: time.Now().Add(-1 * time.Minute), +// End: time.Now(), +// Query: `{type="test"}`, +// LineLimit: 1000, +// FieldLimit: 1000, +// } + +// store := newStoreMock() +// store.On("SelectLogs", mock.Anything, mock.Anything). +// Return(mockLogfmtStreamIterator(1, 2), nil) + +// queryClient := newQueryClientMock() +// queryClient.On("Recv"). +// Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil) + +// ingesterClient := newQuerierClientMock() +// ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). +// Return(queryClient, nil) + +// querier, _ := newQuerier( +// conf, +// mockIngesterClientConfig(), +// newIngesterClientMockFactory(ingesterClient), +// mockReadRingWithOneActiveIngester(), +// &mockDeleteGettter{}, +// store, limits) + +// b.ReportAllocs() +// b.ResetTimer() + +// for i := 0; i < b.N; i++ { +// _, err := querier.DetectedFields(ctx, &request) +// assert.NoError(b, err) +// } +// } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 3b8031cb5e1ef..8e1c6a04948da 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" - "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +29,6 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/httpreq" logutil "github.com/grafana/loki/v3/pkg/util/log" - "github.com/grafana/loki/v3/pkg/util/validation" ) const ( @@ -240,14 +238,11 @@ func NewMiddleware( } detectedFieldsTripperware, err := NewDetectedFieldsTripperware( - cfg, - log, limits, schema, - codec, - iqo, - metrics, - metricsNamespace) + limitedTripperware, + logFilterTripperware, + ) if err != nil { return nil, nil, err } @@ -1218,88 +1213,16 @@ func sharedIndexTripperware( // NewDetectedFieldsTripperware creates a new frontend tripperware responsible for handling detected field requests, which are basically log filter requests with a bit more processing. func NewDetectedFieldsTripperware( - cfg Config, - log log.Logger, limits Limits, schema config.SchemaConfig, - merger base.Merger, - iqo util.IngesterQueryOptions, - metrics *Metrics, - metricsNamespace string, + limitedTripperware base.Middleware, + logTripperware base.Middleware, ) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { - splitter := newDefaultSplitter(limits, iqo) - - queryRangeMiddleware := []base.Middleware{ - StatsCollectorMiddleware(), - NewLimitsMiddleware(limits), - base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitter, metrics.SplitByMetrics), - } + limitedHandler := limitedTripperware.Wrap(next) + logHandler := logTripperware.Wrap(next) - if cfg.MaxRetries > 0 { - queryRangeMiddleware = append( - queryRangeMiddleware, base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), - base.NewRetryMiddleware(log, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, metricsNamespace), - ) - } - - limitedRT := NewLimitedRoundTripper(next, limits, schema.Configs, queryRangeMiddleware...) - return NewSketchRemovingHandler(limitedRT, limits, splitter) + detectedFieldsHandler := NewDetectedFieldsHandler(limitedHandler, logHandler, limits) + return NewLimitedRoundTripper(next, limits, schema.Configs, detectedFieldsHandler) }), nil } - -// NewSketchRemovingHandler returns a handler that removes sketches from detected fields responses before -// returning them to the user. We only need sketches internally for calculating cardinality for split queries. -// We're already doing this sanitization in the merge code, so this handler catches non-split queries -// to make sure their sketches are also removed. -func NewSketchRemovingHandler(next queryrangebase.Handler, limits Limits, splitter splitter) queryrangebase.Handler { - return queryrangebase.HandlerFunc( - func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { - res, err := next.Do(ctx, req) - if err != nil { - return nil, err - } - - resp, ok := res.(*DetectedFieldsResponse) - if !ok { - return res, nil - } - - tenantIDs, err := tenant.TenantIDs(ctx) - if err != nil { - return resp, nil - } - - interval := validation.SmallestPositiveNonZeroDurationPerTenant( - tenantIDs, - limits.QuerySplitDuration, - ) - - // sketeches get cleaned up in the merge code, so we only need catch the cases - // where no splitting happened - if interval == 0 { - return removeSketches(resp), nil - } - - intervals, err := splitter.split(time.Now().UTC(), tenantIDs, req, interval) - if err != nil || len(intervals) < 2 { - return removeSketches(resp), nil - } - - // must have been splits, so sketches are already removed - return resp, nil - }, - ) -} - -// removeSketches removes sketches and field limit from a detected fields response. -// this is only needed for queries that were not split. -func removeSketches(resp *DetectedFieldsResponse) *DetectedFieldsResponse { - for i := range resp.Response.Fields { - resp.Response.Fields[i].Sketch = nil - } - - resp.Response.FieldLimit = 0 - return resp -}