Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: query service: logs pipelines timestamp parsing processor #4105

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/query-service/app/logparsingpipeline/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ type PipelineOperator struct {
Name string `json:"name,omitempty" yaml:"-"`

// optional keys depending on the type
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
Timestamp *TimestampParser `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
ParseTo string `json:"parse_to,omitempty" yaml:"parse_to,omitempty"`
Pattern string `json:"pattern,omitempty" yaml:"pattern,omitempty"`
Regex string `json:"regex,omitempty" yaml:"regex,omitempty"`
ParseFrom string `json:"parse_from,omitempty" yaml:"parse_from,omitempty"`
*TraceParser `yaml:",inline,omitempty"`
Field string `json:"field,omitempty" yaml:"field,omitempty"`
Value string `json:"value,omitempty" yaml:"value,omitempty"`
Expand All @@ -63,6 +62,10 @@ type PipelineOperator struct {
Routes *[]Route `json:"routes,omitempty" yaml:"routes,omitempty"`
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
Default string `json:"default,omitempty" yaml:"default,omitempty"`

// time_parser fields.
Layout string `json:"layout,omitempty" yaml:"layout,omitempty"`
LayoutType string `json:"layout_type,omitempty" yaml:"layout_type,omitempty"`
}

type TimestampParser struct {
Expand Down
38 changes: 35 additions & 3 deletions pkg/query-service/app/logparsingpipeline/pipelineBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
continue
}

operators := getOperators(v.Config)
operators, err := getOperators(v.Config)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to prepare operators")
}

if len(operators) == 0 {
continue
}
Expand Down Expand Up @@ -68,7 +72,7 @@ func PreparePipelineProcessor(pipelines []Pipeline) (map[string]interface{}, []s
return processors, names, nil
}

func getOperators(ops []PipelineOperator) []PipelineOperator {
func getOperators(ops []PipelineOperator) ([]PipelineOperator, error) {
filteredOp := []PipelineOperator{}
for i, operator := range ops {
if operator.Enabled {
Expand Down Expand Up @@ -106,14 +110,42 @@ func getOperators(ops []PipelineOperator) []PipelineOperator {

} else if operator.Type == "trace_parser" {
cleanTraceParser(&operator)

} else if operator.Type == "time_parser" {
parseFromParts := strings.Split(operator.ParseFrom, ".")
parseFromPath := strings.Join(parseFromParts, "?.")

operator.If = fmt.Sprintf(`%s != nil`, parseFromPath)

if operator.LayoutType == "strptime" {
regex, err := RegexForStrptimeLayout(operator.Layout)
if err != nil {
return nil, fmt.Errorf("could not generate time_parser processor: %w", err)
}

operator.If = fmt.Sprintf(
`%s && %s matches "%s"`, operator.If, parseFromPath, regex,
)
} else if operator.LayoutType == "epoch" {
valueRegex := `^\\s*[0-9]+\\s*$`
if strings.Contains(operator.Layout, ".") {
valueRegex = `^\\s*[0-9]+\\.[0-9]+\\s*$`
}

operator.If = fmt.Sprintf(
`%s && string(%s) matches "%s"`, operator.If, parseFromPath, valueRegex,
)

}
// TODO(Raj): Maybe add support for gotime too eventually
}

filteredOp = append(filteredOp, operator)
} else if i == len(ops)-1 && len(filteredOp) != 0 {
filteredOp[len(filteredOp)-1].Output = ""
}
}
return filteredOp
return filteredOp, nil
}

func cleanTraceParser(operator *PipelineOperator) {
Expand Down
80 changes: 77 additions & 3 deletions pkg/query-service/app/logparsingpipeline/pipelineBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logparsingpipeline

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -198,7 +199,8 @@ var prepareProcessorTestData = []struct {
func TestPreparePipelineProcessor(t *testing.T) {
for _, test := range prepareProcessorTestData {
Convey(test.Name, t, func() {
res := getOperators(test.Operators)
res, err := getOperators(test.Operators)
So(err, ShouldBeNil)
So(res, ShouldResemble, test.Output)
})
}
Expand Down Expand Up @@ -256,11 +258,13 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
}
}

testCases := []struct {
type pipelineTestCase struct {
Name string
Operator PipelineOperator
NonMatchingLog model.SignozLog
}{
}

testCases := []pipelineTestCase{
{
"regex processor should ignore log with missing field",
PipelineOperator{
Expand Down Expand Up @@ -342,12 +346,82 @@ func TestNoCollectorErrorsFromProcessorsForMismatchedLogs(t *testing.T) {
Field: "attributes.test",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"time parser should ignore logs with missing field.",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%Y-%m-%dT%H:%M:%S.%f%z",
},
makeTestLog("mismatching log", map[string]string{}),
}, {
"time parser should ignore logs timestamp values that don't contain expected strptime layout.",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%Y-%m-%dT%H:%M:%S.%f%z",
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "2023-11-27T12:03:28A239907+0530",
}),
}, {
"time parser should ignore logs timestamp values that don't contain an epoch",
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "s",
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": "not-an-epoch",
}),
},
// TODO(Raj): see if there is an error scenario for grok parser.
// TODO(Raj): see if there is an error scenario for trace parser.
// TODO(Raj): see if there is an error scenario for Add operator.
}

// Some more timeparser test cases
epochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"}
epochTestValues := []string{
"1136214245", "1136214245123", "1136214245123456",
"1136214245123456789", "1136214245.123",
"1136214245.123456", "1136214245.123456789",
}
for _, epochLayout := range epochLayouts {
for _, testValue := range epochTestValues {
testCases = append(testCases, pipelineTestCase{
fmt.Sprintf(
"time parser should ignore log with timestamp value %s that doesn't match layout type %s",
testValue, epochLayout,
),
PipelineOperator{
ID: "time",
Type: "time_parser",
Enabled: true,
Name: "time parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: epochLayout,
},
makeTestLog("mismatching log", map[string]string{
"test_timestamp": testValue,
}),
})
}
}

for _, testCase := range testCases {
testPipelines := []Pipeline{makeTestPipeline([]PipelineOperator{testCase.Operator})}

Expand Down
34 changes: 34 additions & 0 deletions pkg/query-service/app/logparsingpipeline/postablePipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

v3 "go.signoz.io/signoz/pkg/query-service/model/v3"
"go.signoz.io/signoz/pkg/query-service/queryBuilderToExpr"
"golang.org/x/exp/slices"
)

// PostablePipelines are a list of user defined pielines
Expand Down Expand Up @@ -164,6 +165,39 @@ func isValidOperator(op PipelineOperator) error {
if len(op.Fields) == 0 {
return fmt.Errorf(fmt.Sprintf("fields of %s retain operator cannot be empty", op.ID))
}

case "time_parser":
if op.ParseFrom == "" {
return fmt.Errorf("parse from of time parsing processor %s cannot be empty", op.ID)
}
if op.LayoutType != "epoch" && op.LayoutType != "strptime" {
// TODO(Raj): Maybe add support for gotime format
return fmt.Errorf(
"invalid format type '%s' of time parsing processor %s", op.LayoutType, op.ID,
)
}
if op.Layout == "" {
return fmt.Errorf(fmt.Sprintf("format can not be empty for time parsing processor %s", op.ID))
}

validEpochLayouts := []string{"s", "ms", "us", "ns", "s.ms", "s.us", "s.ns"}
if op.LayoutType == "epoch" && !slices.Contains(validEpochLayouts, op.Layout) {
return fmt.Errorf(
"invalid epoch format '%s' of time parsing processor %s", op.LayoutType, op.ID,
)
}

// TODO(Raj): Add validation for strptime layouts via
// collector simulator maybe.
if op.LayoutType == "strptime" {
_, err := RegexForStrptimeLayout(op.Layout)
if err != nil {
return fmt.Errorf(
"invalid strptime format '%s' of time parsing processor %s: %w", op.LayoutType, op.ID, err,
)
}
}

default:
return fmt.Errorf(fmt.Sprintf("operator type %s not supported for %s, use one of (grok_parser, regex_parser, copy, move, add, remove, trace_parser, retain)", op.Type, op.ID))
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/query-service/app/logparsingpipeline/postablePipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,57 @@ var operatorTest = []struct {
},
},
IsValid: false,
}, {
Name: "Timestamp Parser - valid",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "s",
},
IsValid: true,
}, {
Name: "Timestamp Parser - invalid - bad parsefrom attribute",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "timestamp",
LayoutType: "epoch",
Layout: "s",
},
IsValid: false,
}, {
Name: "Timestamp Parser - unsupported layout_type",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
// TODO(Raj): Maybe add support for gotime format
LayoutType: "gotime",
Layout: "Mon Jan 2 15:04:05 -0700 MST 2006",
},
IsValid: false,
}, {
Name: "Timestamp Parser - invalid epoch layout",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "epoch",
Layout: "%Y-%m-%d",
},
IsValid: false,
}, {
Name: "Timestamp Parser - invalid strptime layout",
Operator: PipelineOperator{
ID: "time",
Type: "time_parser",
ParseFrom: "attributes.test_timestamp",
LayoutType: "strptime",
Layout: "%U",
},
IsValid: false,
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/query-service/app/logparsingpipeline/preview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPipelinePreview(t *testing.T) {

}

func TestGrokParsingPreview(t *testing.T) {
func TestGrokParsingProcessor(t *testing.T) {
require := require.New(t)

testPipelines := []Pipeline{
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestGrokParsingPreview(t *testing.T) {
require.Equal("route/server.go:71", processed.Attributes_string["location"])
}

func TestTraceParsingPreview(t *testing.T) {
func TestTraceParsingProcessor(t *testing.T) {
require := require.New(t)

testPipelines := []Pipeline{
Expand Down
Loading
Loading