diff --git a/pkg/query-service/app/logparsingpipeline/model.go b/pkg/query-service/app/logparsingpipeline/model.go index 0c4da3df37..dc52f157b8 100644 --- a/pkg/query-service/app/logparsingpipeline/model.go +++ b/pkg/query-service/app/logparsingpipeline/model.go @@ -49,11 +49,10 @@ type PipelineOperator struct { Name string `json:"name,omitempty" yaml:"-"` // optional keys depending on the type - ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` - Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` - Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` - ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` - Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"` + ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"` + Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"` + Regex string `json:"regex,omitempty" yaml:"regex,omitempty"` + ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"` *TraceParser `yaml:",inline,omitempty"` Field string `json:"field,omitempty" yaml:"field,omitempty"` Value string `json:"value,omitempty" yaml:"value,omitempty"` @@ -63,6 +62,10 @@ type PipelineOperator struct { Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"` Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"` Default string `json:"default,omitempty" yaml:"default,omitempty"` + + // time_parser fields. + Layout string `json:"layout,omitempty" yaml:"layout,omitempty"` + LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"` } type TimestampParser struct { diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go index fa9e095de3..14ac63d3cc 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder.go @@ -25,7 +25,11 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s continue } - operators := getOperators(v.Config) + operators, err := getOperators(v.Config) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to prepare operators") + } + if len(operators) == 0 { continue } @@ -68,7 +72,7 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s return processors, names, nil } -func getOperators(ops []PipelineOperator) []PipelineOperator { +func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) { filteredOp := []PipelineOperator{} for i, operator := range ops { if operator.Enabled { @@ -106,6 +110,34 @@ func getOperators(ops []PipelineOperator) []PipelineOperator { } else if operator.Type == "trace_parser" { cleanTraceParser(&operator) + + } else if operator.Type == "time_parser" { + parseFromParts := strings.Split(operator.ParseFrom, ".") + parseFromPath := strings.Join(parseFromParts, "?.") + + operator.If = fmt.Sprintf(`%s != nil`, parseFromPath) + + if operator.LayoutType == "strptime" { + regex, err := RegexForStrptimeLayout(operator.Layout) + if err != nil { + return nil, fmt.Errorf("could not generate time_parser processor: %w", err) + } + + operator.If = fmt.Sprintf( + `%s && %s matches "%s"`, operator.If, parseFromPath, regex, + ) + } else if operator.LayoutType == "epoch" { + valueRegex := `^\\s*[0-9]+\\s*$` + if strings.Contains(operator.Layout, ".") { + valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$` + } + + operator.If = fmt.Sprintf( + `%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex, + ) + + } + // TODO(Raj): Maybe add support for gotime too eventually } filteredOp = append(filteredOp, operator) @@ -113,7 +145,7 @@ func getOperators(ops []PipelineOperator) []PipelineOperator { filteredOp[len(filteredOp)-1].Output = "" } } - return filteredOp + return filteredOp, nil } func cleanTraceParser(operator *PipelineOperator) { diff --git a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go index 562a140b5d..0ef3ff71a5 100644 --- a/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go +++ b/pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go @@ -2,6 +2,7 @@ package logparsingpipeline import ( "context" + "fmt" "strings" "testing" "time" @@ -198,7 +199,8 @@ var prepareProcessorTestData = []struct { func TestPreparePipelineProcessor(t *testing.T) { for _, test := range prepareProcessorTestData { Convey(test.Name, t, func() { - res := getOperators(test.Operators) + res, err := getOperators(test.Operators) + So(err, ShouldBeNil) So(res, ShouldResemble, test.Output) }) } @@ -256,11 +258,13 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { } } - testCases := []struct { + type pipelineTestCase struct { Name string Operator PipelineOperator NonMatchingLog model.SignozLog - }{ + } + + testCases := []pipelineTestCase{ { "regex processor should ignore log with missing field", PipelineOperator{ @@ -342,12 +346,82 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) { Field: "attributes.test", }, makeTestLog("mismatching log", map[string]string{}), + }, { + "time parser should ignore logs with missing field.", + PipelineOperator{ + ID: "time", + Type: "time_parser", + Enabled: true, + Name: "time parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "strptime", + Layout: "%Y-%m-%dT%H:%M:%S.%f%z", + }, + makeTestLog("mismatching log", map[string]string{}), + }, { + "time parser should ignore logs timestamp values that don't contain expected strptime layout.", + PipelineOperator{ + ID: "time", + Type: "time_parser", + Enabled: true, + Name: "time parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "strptime", + Layout: "%Y-%m-%dT%H:%M:%S.%f%z", + }, + makeTestLog("mismatching log", map[string]string{ + "test_timestamp": "2023-11-27T12:03:28A239907+0530", + }), + }, { + "time parser should ignore logs timestamp values that don't contain an epoch", + PipelineOperator{ + ID: "time", + Type: "time_parser", + Enabled: true, + Name: "time parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "epoch", + Layout: "s", + }, + makeTestLog("mismatching log", map[string]string{ + "test_timestamp": "not-an-epoch", + }), }, // TODO(Raj): see if there is an error scenario for grok parser. // TODO(Raj): see if there is an error scenario for trace parser. // TODO(Raj): see if there is an error scenario for Add operator. } + // Some more timeparser test cases + epochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"} + epochTestValues := []string{ + "1136214245", "1136214245123", "1136214245123456", + "1136214245123456789", "1136214245.123", + "1136214245.123456", "1136214245.123456789", + } + for _, epochLayout := range epochLayouts { + for _, testValue := range epochTestValues { + testCases = append(testCases, pipelineTestCase{ + fmt.Sprintf( + "time parser should ignore log with timestamp value %s that doesn't match layout type %s", + testValue, epochLayout, + ), + PipelineOperator{ + ID: "time", + Type: "time_parser", + Enabled: true, + Name: "time parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "epoch", + Layout: epochLayout, + }, + makeTestLog("mismatching log", map[string]string{ + "test_timestamp": testValue, + }), + }) + } + } + for _, testCase := range testCases { testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})} diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline.go b/pkg/query-service/app/logparsingpipeline/postablePipeline.go index 472303b527..1018f2f41d 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline.go +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline.go @@ -8,6 +8,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr" + "golang.org/x/exp/slices" ) // PostablePipelines are a list of user defined pielines @@ -164,6 +165,39 @@ func isValidOperator(op PipelineOperator) error { if len(op.Fields) == 0 { return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID)) } + + case "time_parser": + if op.ParseFrom == "" { + return fmt.Errorf("parse from of time parsing processor %s cannot be empty", op.ID) + } + if op.LayoutType != "epoch" && op.LayoutType != "strptime" { + // TODO(Raj): Maybe add support for gotime format + return fmt.Errorf( + "invalid format type '%s' of time parsing processor %s", op.LayoutType, op.ID, + ) + } + if op.Layout == "" { + return fmt.Errorf(fmt.Sprintf("format can not be empty for time parsing processor %s", op.ID)) + } + + validEpochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"} + if op.LayoutType == "epoch" && !slices.Contains(validEpochLayouts, op.Layout) { + return fmt.Errorf( + "invalid epoch format '%s' of time parsing processor %s", op.LayoutType, op.ID, + ) + } + + // TODO(Raj): Add validation for strptime layouts via + // collector simulator maybe. + if op.LayoutType == "strptime" { + _, err := RegexForStrptimeLayout(op.Layout) + if err != nil { + return fmt.Errorf( + "invalid strptime format '%s' of time parsing processor %s: %w", op.LayoutType, op.ID, err, + ) + } + } + default: return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID)) } diff --git a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go index d2f9ec9b09..8f8a6d9c48 100644 --- a/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go +++ b/pkg/query-service/app/logparsingpipeline/postablePipeline_test.go @@ -275,6 +275,57 @@ var operatorTest = []struct { }, }, IsValid: false, + }, { + Name: "Timestamp Parser - valid", + Operator: PipelineOperator{ + ID: "time", + Type: "time_parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "epoch", + Layout: "s", + }, + IsValid: true, + }, { + Name: "Timestamp Parser - invalid - bad parsefrom attribute", + Operator: PipelineOperator{ + ID: "time", + Type: "time_parser", + ParseFrom: "timestamp", + LayoutType: "epoch", + Layout: "s", + }, + IsValid: false, + }, { + Name: "Timestamp Parser - unsupported layout_type", + Operator: PipelineOperator{ + ID: "time", + Type: "time_parser", + ParseFrom: "attributes.test_timestamp", + // TODO(Raj): Maybe add support for gotime format + LayoutType: "gotime", + Layout: "Mon Jan 2 15:04:05 -0700 MST 2006", + }, + IsValid: false, + }, { + Name: "Timestamp Parser - invalid epoch layout", + Operator: PipelineOperator{ + ID: "time", + Type: "time_parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "epoch", + Layout: "%Y-%m-%d", + }, + IsValid: false, + }, { + Name: "Timestamp Parser - invalid strptime layout", + Operator: PipelineOperator{ + ID: "time", + Type: "time_parser", + ParseFrom: "attributes.test_timestamp", + LayoutType: "strptime", + Layout: "%U", + }, + IsValid: false, }, } diff --git a/pkg/query-service/app/logparsingpipeline/preview_test.go b/pkg/query-service/app/logparsingpipeline/preview_test.go index a7fa51732b..a7f01866b7 100644 --- a/pkg/query-service/app/logparsingpipeline/preview_test.go +++ b/pkg/query-service/app/logparsingpipeline/preview_test.go @@ -145,7 +145,7 @@ func TestPipelinePreview(t *testing.T) { } -func TestGrokParsingPreview(t *testing.T) { +func TestGrokParsingProcessor(t *testing.T) { require := require.New(t) testPipelines := []Pipeline{ @@ -207,7 +207,7 @@ func TestGrokParsingPreview(t *testing.T) { require.Equal("route/server.go:71", processed.Attributes_string["location"]) } -func TestTraceParsingPreview(t *testing.T) { +func TestTraceParsingProcessor(t *testing.T) { require := require.New(t) testPipelines := []Pipeline{ diff --git a/pkg/query-service/app/logparsingpipeline/time_parser.go b/pkg/query-service/app/logparsingpipeline/time_parser.go new file mode 100644 index 0000000000..a0ec384867 --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/time_parser.go @@ -0,0 +1,120 @@ +package logparsingpipeline + +import ( + "errors" + "fmt" + "regexp" + "strings" +) + +// Regex for strptime format placeholders supported by the time parser. +// Used for defining if conditions on time parsing operators so they do not +// spam collector logs when encountering values that can't be parsed. +// +// Based on ctimeSubstitutes defined in https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/timeutils/internal/ctimefmt/ctimefmt.go#L22 +// +// TODO(Raj): Maybe make the expressions tighter. +var ctimeRegex = map[string]string{ + // %Y - Year, zero-padded (0001, 0002, ..., 2019, 2020, ..., 9999) + "%Y": "[0-9]{4}", + // %y - Year, last two digits, zero-padded (01, ..., 99) + "%y": "[0-9]{2}", + // %m - Month as a decimal number (01, 02, ..., 12) + "%m": "[0-9]{2}", + // %o - Month as a space-padded number ( 1, 2, ..., 12) + "%o": "_[0-9]", + // %q - Month as a unpadded number (1,2,...,12) + "%q": "[0-9]", + // %b, %h - Abbreviated month name (Jan, Feb, ...) + "%b": "[a-zA-Z]*?", + "%h": "[a-zA-Z]*?", + // %B - Full month name (January, February, ...) + "%B": "[a-zA-Z]*?", + // %d - Day of the month, zero-padded (01, 02, ..., 31) + "%d": "[0-9]{2}", + // %e - Day of the month, space-padded ( 1, 2, ..., 31) + "%e": "_[0-9]", + // %g - Day of the month, unpadded (1,2,...,31) + "%g": "[0-9]", + // %a - Abbreviated weekday name (Sun, Mon, ...) + "%a": "[a-zA-Z]*?", + // %A - Full weekday name (Sunday, Monday, ...) + "%A": "[a-zA-Z]*?", + // %H - Hour (24-hour clock) as a zero-padded decimal number (00, ..., 24) + "%H": "[0-9]{2}", + // %l - Hour (12-hour clock: 0, ..., 12) + "%l": "[0-9]{1-2}", + // %I - Hour (12-hour clock) as a zero-padded decimal number (00, ..., 12) + "%I": "[0-9]{2}", + // %p - Locale’s equivalent of either AM or PM + "%p": "(AM|PM)", + // %P - Locale’s equivalent of either am or pm + "%P": "(am|pm)", + // %M - Minute, zero-padded (00, 01, ..., 59) + "%M": "[0-9]{2}", + // %S - Second as a zero-padded decimal number (00, 01, ..., 59) + "%S": "[0-9]{2}", + // %L - Millisecond as a decimal number, zero-padded on the left (000, 001, ..., 999) + "%L": "[0-9]*?", + // %f - Microsecond as a decimal number, zero-padded on the left (000000, ..., 999999) + "%f": "[0-9]*?", + // %s - Nanosecond as a decimal number, zero-padded on the left (000000, ..., 999999) + "%s": "[0-9]*?", + // %Z - Timezone name or abbreviation or empty (UTC, EST, CST) + "%Z": "[a-zA-Z]*?", + // %z - UTC offset in the form ±HHMM[SS[.ffffff]] or empty(+0000, -0400) + "%z": "[-+][0-9]*?", + // Weekday as a decimal number, where 0 is Sunday and 6 is Saturday. + "%w": "[-+][0-9]*?", + "%i": "[-+][0-9]*?", + "%j": "[-+][0-9]{2}:[0-9]{2}", + "%k": "[-+][0-9]{2}:[0-9]{2}:[0-9]{2}", + // %D, %x - Short MM/DD/YY date, equivalent to %m/%d/%y + "%D": "[0-9]{2}/[0-9]{2}/[0-9]{4}", + // %D, %x - Short MM/DD/YY date, equivalent to %m/%d/%y + "%x": "[0-9]{2}/[0-9]{2}/[0-9]{4}", + // %F - Short YYYY-MM-DD date, equivalent to %Y-%m-%d + "%F": "[0-9]{4}-[0-9]{2}-[0-9]{2}", + // %T, %X - ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S + "%T": "[0-9]{2}:[0-9]{2}:[0-9]{2}", + // %T, %X - ISO 8601 time format (HH:MM:SS), equivalent to %H:%M:%S + "%X": "[0-9]{2}:[0-9]{2}:[0-9]{2}", + // %r - 12-hour clock time (02:55:02 pm) + "%r": "[0-9]{2}:[0-9]{2}:[0-9]{2} (am|pm)", + // %R - 24-hour HH:MM time, equivalent to %H:%M + "%R": "[0-9]{2}:[0-9]{2}", + // %n - New-line character ('\n') + "%n": "\n", + // %t - Horizontal-tab character ('\t') + "%t": "\t", + // %% - A % sign + "%%": "%", + // %c - Date and time representation (Mon Jan 02 15:04:05 2006) + "%c": "[a-zA-Z]{3} [a-zA-Z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}", +} + +func RegexForStrptimeLayout(layout string) (string, error) { + layoutRegex := layout + for _, regexSpecialChar := range []string{ + ".", "+", "*", "?", "^", "$", "(", ")", "[", "]", "{", "}", "|", `\`, + } { + layoutRegex = strings.ReplaceAll(layoutRegex, regexSpecialChar, `\`+regexSpecialChar) + } + + var errs []error + replaceStrptimeDirectiveWithRegex := func(directive string) string { + if regex, ok := ctimeRegex[directive]; ok { + return regex + } + errs = append(errs, errors.New("unsupported ctimefmt directive: "+directive)) + return "" + } + + strptimeDirectiveRegexp := regexp.MustCompile(`%.`) + layoutRegex = strptimeDirectiveRegexp.ReplaceAllStringFunc(layoutRegex, replaceStrptimeDirectiveWithRegex) + if len(errs) != 0 { + return "", fmt.Errorf("couldn't generate regex for ctime format: %v", errs) + } + + return layoutRegex, nil +} diff --git a/pkg/query-service/app/logparsingpipeline/time_parser_test.go b/pkg/query-service/app/logparsingpipeline/time_parser_test.go new file mode 100644 index 0000000000..15c41bf73e --- /dev/null +++ b/pkg/query-service/app/logparsingpipeline/time_parser_test.go @@ -0,0 +1,136 @@ +package logparsingpipeline + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "testing" + "time" + + "github.com/antonmedv/expr" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestRegexForStrptimeLayout(t *testing.T) { + require := require.New(t) + + var testCases = []struct { + strptimeLayout string + str string + shouldMatch bool + }{ + { + strptimeLayout: "%Y-%m-%dT%H:%M:%S.%f%z", + str: "2023-11-26T12:03:28.239907+0530", + shouldMatch: true, + }, { + strptimeLayout: "%d-%m-%Y", + str: "26-11-2023", + shouldMatch: true, + }, { + strptimeLayout: "%d-%m-%Y", + str: "26-11-2023", + shouldMatch: true, + }, { + strptimeLayout: "%d/%m/%y", + str: "11/03/02", + shouldMatch: true, + }, { + strptimeLayout: "%A, %d. %B %Y %I:%M%p", + str: "Tuesday, 21. November 2006 04:30PM11/03/02", + shouldMatch: true, + }, { + strptimeLayout: "%A, %d. %B %Y %I:%M%p", + str: "some random text", + shouldMatch: false, + }, + } + + for _, test := range testCases { + regex, err := RegexForStrptimeLayout(test.strptimeLayout) + require.Nil(err, test.strptimeLayout) + + code := fmt.Sprintf(`"%s" matches "%s"`, test.str, regex) + program, err := expr.Compile(code) + require.Nil(err, test.strptimeLayout) + + output, err := expr.Run(program, map[string]string{}) + require.Nil(err, test.strptimeLayout) + require.Equal(output, test.shouldMatch, test.strptimeLayout) + + } +} + +func TestTimestampParsingProcessor(t *testing.T) { + require := require.New(t) + + testPipelines := []Pipeline{ + { + OrderId: 1, + Name: "pipeline1", + Alias: "pipeline1", + Enabled: true, + Filter: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "method", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + }, + Operator: "=", + Value: "GET", + }, + }, + }, + Config: []PipelineOperator{}, + }, + } + + var timestampParserOp PipelineOperator + err := json.Unmarshal([]byte(` + { + "orderId": 1, + "enabled": true, + "type": "time_parser", + "name": "Test timestamp parser", + "id": "test-timestamp-parser", + "parse_from": "attributes.test_timestamp", + "layout_type": "strptime", + "layout": "%Y-%m-%dT%H:%M:%S.%f%z" + } + `), ×tampParserOp) + require.Nil(err) + testPipelines[0].Config = append(testPipelines[0].Config, timestampParserOp) + + testTimestampStr := "2023-11-27T12:03:28.239907+0530" + testLog := makeTestLogEntry( + "test log", + map[string]string{ + "method": "GET", + "test_timestamp": testTimestampStr, + }, + ) + + result, collectorWarnAndErrorLogs, err := SimulatePipelinesProcessing( + context.Background(), + testPipelines, + []model.SignozLog{ + testLog, + }, + ) + require.Nil(err) + require.Equal(1, len(result)) + require.Equal(0, len(collectorWarnAndErrorLogs), strings.Join(collectorWarnAndErrorLogs, "\n")) + processed := result[0] + + expectedTimestamp, err := time.Parse("2006-01-02T15:04:05.999999-0700", testTimestampStr) + require.Nil(err) + + require.Equal(uint64(expectedTimestamp.UnixNano()), processed.Timestamp) + +}