From 33715c72b9325b8efbf2ea2822bbb34f0283d966 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Fri, 21 Jan 2022 06:57:46 +0000 Subject: [PATCH 01/16] [processor/transform] Add business logic for handling traces queries. --- .../internal/common/parser.go | 26 +- .../internal/common/parser_test.go | 27 +- .../internal/traces/condition.go | 54 ++ .../internal/traces/condition_test.go | 136 +++++ .../internal/traces/expression.go | 79 +++ .../internal/traces/expression_test.go | 101 ++++ .../internal/traces/functions.go | 60 ++ .../internal/traces/processor.go | 88 +++ .../internal/traces/processor_test.go | 238 ++++++++ .../internal/traces/registry.go | 86 +++ .../internal/traces/registry_test.go | 284 +++++++++ .../internal/traces/traces.go | 368 ++++++++++++ .../internal/traces/traces_test.go | 544 ++++++++++++++++++ 13 files changed, 2061 insertions(+), 30 deletions(-) create mode 100644 processor/transformprocessor/internal/traces/condition.go create mode 100644 processor/transformprocessor/internal/traces/condition_test.go create mode 100644 processor/transformprocessor/internal/traces/expression.go create mode 100644 processor/transformprocessor/internal/traces/expression_test.go create mode 100644 processor/transformprocessor/internal/traces/functions.go create mode 100644 processor/transformprocessor/internal/traces/processor.go create mode 100644 processor/transformprocessor/internal/traces/processor_test.go create mode 100644 processor/transformprocessor/internal/traces/registry.go create mode 100644 processor/transformprocessor/internal/traces/registry_test.go create mode 100644 processor/transformprocessor/internal/traces/traces.go create mode 100644 processor/transformprocessor/internal/traces/traces_test.go diff --git a/processor/transformprocessor/internal/common/parser.go b/processor/transformprocessor/internal/common/parser.go index 54f039b50598..f4b7db0b67d2 100644 --- a/processor/transformprocessor/internal/common/parser.go +++ b/processor/transformprocessor/internal/common/parser.go @@ -19,9 +19,9 @@ import ( "github.com/alecthomas/participle/v2/lexer" ) -// Query represents a parsed query. It is the entry point into the query DSL. +// ParsedQuery represents a parsed query. It is the entry point into the query DSL. // nolint:govet -type Query struct { +type ParsedQuery struct { Invocation Invocation `@@` Condition *Condition `( "where" @@ )?` } @@ -65,27 +65,21 @@ type Field struct { MapKey *string `( "[" @String "]" )?` } -func Parse(rawQueries []string) ([]Query, error) { +func Parse(raw string) (*ParsedQuery, error) { parser, err := newParser() if err != nil { - return []Query{}, err + return &ParsedQuery{}, err } - parsed := make([]Query, 0) - - for _, raw := range rawQueries { - query := Query{} - err = parser.ParseString("", raw, &query) - if err != nil { - return []Query{}, err - } - parsed = append(parsed, query) + parsed := &ParsedQuery{} + err = parser.ParseString("", raw, parsed) + if err != nil { + return nil, err } - return parsed, nil } -// newParser returns a parser that can be used to read a string into a Query. An error will be returned if the string +// newParser returns a parser that can be used to read a string into a ParsedQuery. An error will be returned if the string // is not formatted for the DSL. func newParser() (*participle.Parser, error) { lex := lexer.MustSimple([]lexer.Rule{ @@ -96,7 +90,7 @@ func newParser() (*participle.Parser, error) { {Name: `Operators`, Pattern: `==|!=|[,.()\[\]]`, Action: nil}, {Name: "whitespace", Pattern: `\s+`, Action: nil}, }) - return participle.Build(&Query{}, + return participle.Build(&ParsedQuery{}, participle.Lexer(lex), participle.Unquote("String"), participle.Elide("whitespace"), diff --git a/processor/transformprocessor/internal/common/parser_test.go b/processor/transformprocessor/internal/common/parser_test.go index 40e9e6411bc4..21d63023dd69 100644 --- a/processor/transformprocessor/internal/common/parser_test.go +++ b/processor/transformprocessor/internal/common/parser_test.go @@ -23,11 +23,11 @@ import ( func Test_parse(t *testing.T) { tests := []struct { query string - expected Query + expected *ParsedQuery }{ { query: `set("foo")`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -41,7 +41,7 @@ func Test_parse(t *testing.T) { }, { query: `met(1.2)`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "met", Arguments: []Value{ @@ -55,7 +55,7 @@ func Test_parse(t *testing.T) { }, { query: `fff(12)`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "fff", Arguments: []Value{ @@ -69,7 +69,7 @@ func Test_parse(t *testing.T) { }, { query: `set("foo", get(bear.honey))`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -102,7 +102,7 @@ func Test_parse(t *testing.T) { }, { query: `set(foo.attributes["bar"].cat, "dog")`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -132,7 +132,7 @@ func Test_parse(t *testing.T) { }, { query: `set(foo.attributes["bar"].cat, "dog") where name == "fido"`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -176,7 +176,7 @@ func Test_parse(t *testing.T) { }, { query: `set(foo.attributes["bar"].cat, "dog") where name != "fido"`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -220,7 +220,7 @@ func Test_parse(t *testing.T) { }, { query: `set ( foo.attributes[ "bar"].cat, "dog") where name=="fido"`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -264,7 +264,7 @@ func Test_parse(t *testing.T) { }, { query: `set("fo\"o")`, - expected: Query{ + expected: &ParsedQuery{ Invocation: Invocation{ Function: "set", Arguments: []Value{ @@ -280,10 +280,9 @@ func Test_parse(t *testing.T) { for _, tt := range tests { t.Run(tt.query, func(t *testing.T) { - parsed, err := Parse([]string{tt.query}) + parsed, err := Parse(tt.query) assert.NoError(t, err) - assert.Len(t, parsed, 1) - assert.Equal(t, tt.expected, parsed[0]) + assert.Equal(t, tt.expected, parsed) }) } } @@ -298,7 +297,7 @@ func Test_parse_failure(t *testing.T) { } for _, tt := range tests { t.Run(tt, func(t *testing.T) { - _, err := Parse([]string{tt}) + _, err := Parse(tt) assert.Error(t, err) }) } diff --git a/processor/transformprocessor/internal/traces/condition.go b/processor/transformprocessor/internal/traces/condition.go new file mode 100644 index 000000000000..e7d2a44ffb62 --- /dev/null +++ b/processor/transformprocessor/internal/traces/condition.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" + +import ( + "fmt" + + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +func newConditionEvaluator(cond *common.Condition) (func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool, error) { + if cond == nil { + return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { + return true + }, nil + } + left, err := newGetter(cond.Left) + if err != nil { + return nil, err + } + right, err := newGetter(cond.Right) + if err != nil { + return nil, err + } + + switch cond.Op { + case "==": + return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { + a := left.get(span, il, resource) + b := right.get(span, il, resource) + return a == b + }, nil + case "!=": + return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { + return left.get(span, il, resource) != right.get(span, il, resource) + }, nil + } + + return nil, fmt.Errorf("unrecognized boolean operation %v", cond.Op) +} diff --git a/processor/transformprocessor/internal/traces/condition_test.go b/processor/transformprocessor/internal/traces/condition_test.go new file mode 100644 index 000000000000..c69dd90697fc --- /dev/null +++ b/processor/transformprocessor/internal/traces/condition_test.go @@ -0,0 +1,136 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +func Test_newConditionEvaluator(t *testing.T) { + span := pdata.NewSpan() + span.SetName("bear") + tests := []struct { + name string + cond *common.Condition + matching pdata.Span + }{ + { + name: "literals match", + cond: &common.Condition{ + Left: common.Value{ + String: strp("hello"), + }, + Right: common.Value{ + String: strp("hello"), + }, + Op: "==", + }, + matching: span, + }, + { + name: "literals don't match", + cond: &common.Condition{ + Left: common.Value{ + String: strp("hello"), + }, + Right: common.Value{ + String: strp("goodbye"), + }, + Op: "!=", + }, + matching: span, + }, + { + name: "path expression matches", + cond: &common.Condition{ + Left: common.Value{ + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "name", + }, + }, + }, + }, + Right: common.Value{ + String: strp("bear"), + }, + Op: "==", + }, + matching: span, + }, + { + name: "path expression not matches", + cond: &common.Condition{ + Left: common.Value{ + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "name", + }, + }, + }, + }, + Right: common.Value{ + String: strp("bear"), + }, + Op: "==", + }, + matching: span, + }, + { + name: "no condition", + cond: nil, + matching: span, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + evaluate, err := newConditionEvaluator(tt.cond) + assert.NoError(t, err) + assert.True(t, evaluate(tt.matching, pdata.NewInstrumentationLibrary(), pdata.NewResource())) + }) + } + + t.Run("invalid", func(t *testing.T) { + _, err := newConditionEvaluator(&common.Condition{ + Left: common.Value{ + String: strp("bear"), + }, + Op: "<>", + Right: common.Value{ + String: strp("cat"), + }, + }) + assert.Error(t, err) + }) +} + +func strp(s string) *string { + return &s +} + +func intp(i int64) *int64 { + return &i +} + +func floatp(f float64) *float64 { + return &f +} diff --git a/processor/transformprocessor/internal/traces/expression.go b/processor/transformprocessor/internal/traces/expression.go new file mode 100644 index 000000000000..08a1e4c97ebc --- /dev/null +++ b/processor/transformprocessor/internal/traces/expression.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" + +import ( + "fmt" + + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +// getter allows reading a value while processing traces. Note that data is not necessarily read from input +// telemetry but may be a literal value or a function invocation. +type getter interface { + get(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} +} + +// getSetter allows reading or writing a value to trace data. +type getSetter interface { + getter + set(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) +} + +// literal holds a literal value defined as part of a Query. It does not read from telemetry data. +type literal struct { + value interface{} +} + +func (l literal) get(_ pdata.Span, _ pdata.InstrumentationLibrary, _ pdata.Resource) interface{} { + return l.value +} + +func newGetter(val common.Value) (getter, error) { + if s := val.String; s != nil { + return &literal{ + value: *s, + }, nil + } else if f := val.Float; f != nil { + return &literal{ + value: *f, + }, nil + } else if i := val.Int; i != nil { + return &literal{ + value: *i, + }, nil + } + + if val.Path != nil { + return newPathGetSetter(val.Path.Fields) + } + + if val.Invocation == nil { + // In practice, can't happen since the DSL grammar guarantees one is set + return nil, fmt.Errorf("no value field set. This is a bug in the transformprocessor") + } + + call, err := newFunctionCall(*val.Invocation) + if err != nil { + return nil, err + } + return &pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return call(span, il, resource) + }, + }, nil +} diff --git a/processor/transformprocessor/internal/traces/expression_test.go b/processor/transformprocessor/internal/traces/expression_test.go new file mode 100644 index 000000000000..b91ad10ba86f --- /dev/null +++ b/processor/transformprocessor/internal/traces/expression_test.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +func hello() func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return "world" + } +} + +func Test_newGetter(t *testing.T) { + registerFunction("hello", hello) + defer unregisterFunction("hello") + + span := pdata.NewSpan() + span.SetName("bear") + tests := []struct { + name string + val common.Value + want interface{} + }{ + { + name: "string literal", + val: common.Value{ + String: strp("str"), + }, + want: "str", + }, + { + name: "float literal", + val: common.Value{ + Float: floatp(1.2), + }, + want: 1.2, + }, + { + name: "int literal", + val: common.Value{ + Int: intp(12), + }, + want: int64(12), + }, + { + name: "path expression", + val: common.Value{ + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "name", + }, + }, + }, + }, + want: "bear", + }, + { + name: "function call", + val: common.Value{ + Invocation: &common.Invocation{ + Function: "hello", + }, + }, + want: "world", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader, err := newGetter(tt.val) + assert.NoError(t, err) + val := reader.get(span, pdata.NewInstrumentationLibrary(), pdata.NewResource()) + assert.Equal(t, tt.want, val) + }) + } + + t.Run("empty value", func(t *testing.T) { + _, err := newGetter(common.Value{}) + assert.Error(t, err) + }) +} diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go new file mode 100644 index 000000000000..74ad33e6f542 --- /dev/null +++ b/processor/transformprocessor/internal/traces/functions.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" + +import ( + "go.opentelemetry.io/collector/model/pdata" +) + +func set(target getSetter, value getter) func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + val := value.get(span, il, resource) + if val != nil { + target.set(span, il, resource, val) + } + return nil + } +} + +func keep(target getSetter, keys []string) func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + keySet := make(map[string]struct{}, len(keys)) + for _, key := range keys { + keySet[key] = struct{}{} + } + + return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + val := target.get(span, il, resource) + if val == nil { + return nil + } + + if attrs, ok := val.(pdata.AttributeMap); ok { + filtered := pdata.NewAttributeMap() + attrs.Range(func(key string, val pdata.AttributeValue) bool { + if _, ok := keySet[key]; ok { + filtered.Insert(key, val) + } + return true + }) + target.set(span, il, resource, filtered) + } + return nil + } +} + +func init() { + registerFunction("keep", keep) + registerFunction("set", set) +} diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go new file mode 100644 index 000000000000..eee8f991ec4e --- /dev/null +++ b/processor/transformprocessor/internal/traces/processor.go @@ -0,0 +1,88 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +type Processor struct { + statements []Query + logger *zap.Logger +} + +// Query holds a top level Query for processing trace data. A Query is a combination of a function +// invocation and the condition to match telemetry for invoking the function. +type Query struct { + function func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} + condition func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool +} + +func NewProcessor(statements []Query, settings component.ProcessorCreateSettings) (*Processor, error) { + return &Processor{ + statements: statements, + logger: settings.Logger, + }, nil +} + +func (p *Processor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { + process(td, p.statements) + return td, nil +} + +func process(td pdata.Traces, statements []Query) { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rspans := td.ResourceSpans().At(i) + for j := 0; j < rspans.InstrumentationLibrarySpans().Len(); j++ { + il := rspans.InstrumentationLibrarySpans().At(j).InstrumentationLibrary() + spans := rspans.InstrumentationLibrarySpans().At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + + for _, statement := range statements { + if statement.condition(span, il, rspans.Resource()) { + statement.function(span, il, rspans.Resource()) + } + } + } + } + } +} + +func (s *Query) UnmarshalText(text []byte) error { + parsed, err := common.Parse(string(text)) + if err != nil { + return err + } + function, err := newFunctionCall(parsed.Invocation) + if err != nil { + return err + } + condition, err := newConditionEvaluator(parsed.Condition) + if err != nil { + return err + } + *s = Query{ + function: function, + condition: condition, + } + return nil +} diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go new file mode 100644 index 000000000000..4921ee11c577 --- /dev/null +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -0,0 +1,238 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/model/pdata" +) + +var ( + TestSpanStartTime = time.Date(2020, 2, 11, 20, 26, 12, 321, time.UTC) + TestSpanStartTimestamp = pdata.NewTimestampFromTime(TestSpanStartTime) + + TestSpanEndTime = time.Date(2020, 2, 11, 20, 26, 13, 789, time.UTC) + TestSpanEndTimestamp = pdata.NewTimestampFromTime(TestSpanEndTime) +) + +func TestProcess(t *testing.T) { + tests := []struct { + query string + want func(td pdata.Traces) + }{ + { + query: `set(attributes["test"], "pass") where name == "operationA"`, + want: func(td pdata.Traces) { + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().InsertString("test", "pass") + }, + }, + { + query: `set(attributes["test"], "pass") where resource.attributes["host.name"] == "localhost"`, + want: func(td pdata.Traces) { + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().InsertString("test", "pass") + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(1).Attributes().InsertString("test", "pass") + }, + }, + { + query: `keep(attributes, "http.method") where name == "operationA"`, + want: func(td pdata.Traces) { + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().Clear() + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().InsertString("http.method", "get") + }, + }, + { + query: `set(status.code, 1) where attributes["http.path"] == "/health"`, + want: func(td pdata.Traces) { + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Status().SetCode(pdata.StatusCodeOk) + td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(1).Status().SetCode(pdata.StatusCodeOk) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.query, func(t *testing.T) { + td := constructTraces() + statements, err := parse([]string{tt.query}) + assert.NoError(t, err) + processor, err := NewProcessor(statements, component.ProcessorCreateSettings{}) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + +func BenchmarkTwoSpans(b *testing.B) { + tests := []struct { + name string + queries []string + }{ + { + name: "no processing", + queries: []string{}, + }, + { + name: "set attribute", + queries: []string{`set(attributes["test"], "pass") where name == "operationA"`}, + }, + { + name: "keep attribute", + queries: []string{`keep(attributes, "http.method") where name == "operationA"`}, + }, + { + name: "no match", + queries: []string{`keep(attributes, "http.method") where name == "unknownOperation"`}, + }, + { + name: "inner field", + queries: []string{`set(status.code, 1) where attributes["http.path"] == "/health"`}, + }, + { + name: "inner field both spans", + queries: []string{ + `set(status.code, 1) where name == "operationA"`, + `set(status.code, 2) where name == "operationB"`, + }, + }, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + statements, err := parse(tt.queries) + assert.NoError(b, err) + b.ResetTimer() + for n := 0; n < b.N; n++ { + td := constructTraces() + process(td, statements) + } + }) + } +} + +func BenchmarkHundredSpans(b *testing.B) { + tests := []struct { + name string + queries []string + }{ + { + name: "no processing", + queries: []string{}, + }, + { + name: "set status code", + queries: []string{ + `set(status.code, 1) where name == "operationA"`, + `set(status.code, 2) where name == "operationB"`, + }, + }, + { + name: "hundred statements", + queries: func() []string { + queries := make([]string, 0) + queries = append(queries, `set(status.code, 1) where name == "operationA"`) + for i := 0; i < 99; i++ { + queries = append(queries, `keep(attributes, "http.method") where name == "unknownOperation"`) + } + return queries + }(), + }, + } + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + statements, err := parse(tt.queries) + assert.NoError(b, err) + b.ResetTimer() + for n := 0; n < b.N; n++ { + td := constructTracesNum(100) + process(td, statements) + } + }) + } +} + +func constructTraces() pdata.Traces { + td := pdata.NewTraces() + rs0 := td.ResourceSpans().AppendEmpty() + rs0.Resource().Attributes().InsertString("host.name", "localhost") + rs0ils0 := rs0.InstrumentationLibrarySpans().AppendEmpty() + fillSpanOne(rs0ils0.Spans().AppendEmpty()) + fillSpanTwo(rs0ils0.Spans().AppendEmpty()) + return td +} + +func constructTracesNum(num int) pdata.Traces { + td := pdata.NewTraces() + rs0 := td.ResourceSpans().AppendEmpty() + rs0ils0 := rs0.InstrumentationLibrarySpans().AppendEmpty() + for i := 0; i < num; i++ { + fillSpanOne(rs0ils0.Spans().AppendEmpty()) + } + return td +} + +func fillSpanOne(span pdata.Span) { + span.SetName("operationA") + span.SetStartTimestamp(TestSpanStartTimestamp) + span.SetEndTimestamp(TestSpanEndTimestamp) + span.SetDroppedAttributesCount(1) + span.Attributes().InsertString("http.method", "get") + span.Attributes().InsertString("http.path", "/health") + span.Attributes().InsertString("http.url", "http://localhost/health") + status := span.Status() + status.SetCode(pdata.StatusCodeError) + status.SetMessage("status-cancelled") +} + +func fillSpanTwo(span pdata.Span) { + span.SetName("operationB") + span.SetStartTimestamp(TestSpanStartTimestamp) + span.SetEndTimestamp(TestSpanEndTimestamp) + span.Attributes().InsertString("http.method", "get") + span.Attributes().InsertString("http.path", "/health") + span.Attributes().InsertString("http.url", "http://localhost/health") + link0 := span.Links().AppendEmpty() + link0.SetDroppedAttributesCount(4) + link1 := span.Links().AppendEmpty() + link1.SetDroppedAttributesCount(4) + span.SetDroppedLinksCount(3) + status := span.Status() + status.SetCode(pdata.StatusCodeError) + status.SetMessage("status-cancelled") +} + +func parse(queries []string) ([]Query, error) { + statements := make([]Query, 0) + for _, q := range queries { + statement := Query{} + err := statement.UnmarshalText([]byte(q)) + if err != nil { + return nil, err + } + statements = append(statements, statement) + } + + return statements, nil +} diff --git a/processor/transformprocessor/internal/traces/registry.go b/processor/transformprocessor/internal/traces/registry.go new file mode 100644 index 000000000000..8efd52a10024 --- /dev/null +++ b/processor/transformprocessor/internal/traces/registry.go @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" + +import ( + "fmt" + "reflect" + + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +var registry = make(map[string]interface{}) + +func newFunctionCall(inv common.Invocation) (func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{}, error) { + if f, ok := registry[inv.Function]; ok { + fType := reflect.TypeOf(f) + args := make([]reflect.Value, 0) + for i := 0; i < fType.NumIn(); i++ { + argType := fType.In(i) + + if argType.Kind() == reflect.Slice { + switch argType.Elem().Kind() { + case reflect.String: + arg := make([]string, 0) + for j := i; j < len(inv.Arguments); j++ { + if inv.Arguments[j].String == nil { + return nil, fmt.Errorf("invalid argument for slice parameter at position %v, must be string", j) + } + arg = append(arg, *inv.Arguments[j].String) + } + args = append(args, reflect.ValueOf(arg)) + default: + return nil, fmt.Errorf("unsupported slice type for function %v", inv.Function) + } + continue + } + + if i >= len(inv.Arguments) { + return nil, fmt.Errorf("not enough arguments for function %v", inv.Function) + } + argDef := inv.Arguments[i] + switch argType.Name() { + case "getSetter": + arg, err := newGetSetter(argDef) + if err != nil { + return nil, fmt.Errorf("invalid argument at position %v %w", i, err) + } + args = append(args, reflect.ValueOf(arg)) + continue + case "getter": + arg, err := newGetter(argDef) + if err != nil { + return nil, fmt.Errorf("invalid argument at position %v %w", i, err) + } + args = append(args, reflect.ValueOf(arg)) + continue + } + } + val := reflect.ValueOf(f) + ret := val.Call(args) + return ret[0].Interface().(func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{}), nil + } + return nil, fmt.Errorf("undefined function %v", inv.Function) +} + +func registerFunction(name string, fun interface{}) { + registry[name] = fun +} + +func unregisterFunction(name string) { + delete(registry, name) +} diff --git a/processor/transformprocessor/internal/traces/registry_test.go b/processor/transformprocessor/internal/traces/registry_test.go new file mode 100644 index 000000000000..b7706f8617f1 --- /dev/null +++ b/processor/transformprocessor/internal/traces/registry_test.go @@ -0,0 +1,284 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +func Test_newFunctionCall(t *testing.T) { + input := pdata.NewSpan() + input.SetName("bear") + attrs := pdata.NewAttributeMap() + attrs.InsertString("test", "1") + attrs.InsertInt("test2", 3) + attrs.InsertBool("test3", true) + attrs.CopyTo(input.Attributes()) + + tests := []struct { + name string + inv common.Invocation + want func(pdata.Span) + }{ + { + name: "set name", + inv: common.Invocation{ + Function: "set", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "name", + }, + }, + }, + }, + { + String: strp("cat"), + }, + }, + }, + want: func(span pdata.Span) { + input.CopyTo(span) + span.SetName("cat") + }, + }, + { + name: "set status.code", + inv: common.Invocation{ + Function: "set", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "status", + }, + { + Name: "code", + }, + }, + }, + }, + { + Int: intp(1), + }, + }, + }, + want: func(span pdata.Span) { + input.CopyTo(span) + span.Status().SetCode(pdata.StatusCodeOk) + }, + }, + { + name: "keep one", + inv: common.Invocation{ + Function: "keep", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + { + String: strp("test"), + }, + }, + }, + want: func(span pdata.Span) { + input.CopyTo(span) + span.Attributes().Clear() + attrs := pdata.NewAttributeMap() + attrs.InsertString("test", "1") + attrs.CopyTo(span.Attributes()) + }, + }, + { + name: "keep two", + inv: common.Invocation{ + Function: "keep", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + { + String: strp("test"), + }, + { + String: strp("test2"), + }, + }, + }, + want: func(span pdata.Span) { + input.CopyTo(span) + span.Attributes().Clear() + attrs := pdata.NewAttributeMap() + attrs.InsertString("test", "1") + attrs.InsertInt("test2", 3) + attrs.CopyTo(span.Attributes()) + }, + }, + { + name: "keep none", + inv: common.Invocation{ + Function: "keep", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + }, + }, + want: func(span pdata.Span) { + input.CopyTo(span) + span.Attributes().Clear() + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + span := pdata.NewSpan() + input.CopyTo(span) + + evaluate, err := newFunctionCall(tt.inv) + assert.NoError(t, err) + evaluate(span, pdata.NewInstrumentationLibrary(), pdata.NewResource()) + + expected := pdata.NewSpan() + tt.want(expected) + assert.Equal(t, expected, span) + }) + } +} + +func Test_newFunctionCall_invalid(t *testing.T) { + tests := []struct { + name string + inv common.Invocation + }{ + { + name: "unknown function", + inv: common.Invocation{ + Function: "unknownfunc", + Arguments: []common.Value{}, + }, + }, + { + name: "not trace accessor", + inv: common.Invocation{ + Function: "set", + Arguments: []common.Value{ + { + String: strp("not path"), + }, + { + String: strp("cat"), + }, + }, + }, + }, + { + name: "not trace reader (invalid function)", + inv: common.Invocation{ + Function: "set", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "name", + }, + }, + }, + }, + { + Invocation: &common.Invocation{ + Function: "unknownfunc", + }, + }, + }, + }, + }, + { + name: "not enough args", + inv: common.Invocation{ + Function: "set", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "name", + }, + }, + }, + }, + { + Invocation: &common.Invocation{ + Function: "unknownfunc", + }, + }, + }, + }, + }, + { + name: "not matching slice type", + inv: common.Invocation{ + Function: "keep", + Arguments: []common.Value{ + { + Path: &common.Path{ + Fields: []common.Field{ + { + Name: "attributes", + }, + }, + }, + }, + { + Int: intp(10), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := newFunctionCall(tt.inv) + assert.Error(t, err) + }) + } +} diff --git a/processor/transformprocessor/internal/traces/traces.go b/processor/transformprocessor/internal/traces/traces.go new file mode 100644 index 000000000000..e842abc91c47 --- /dev/null +++ b/processor/transformprocessor/internal/traces/traces.go @@ -0,0 +1,368 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" + +import ( + "encoding/hex" + "fmt" + "time" + + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +// pathGetSetter is a getSetter which has been resolved using a path expression provided by a user. +type pathGetSetter struct { + getter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} + setter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) +} + +func (path pathGetSetter) get(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return path.getter(span, il, resource) +} + +func (path pathGetSetter) set(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + path.setter(span, il, resource, val) +} + +func newGetSetter(val common.Value) (getSetter, error) { + if val.Path == nil { + return nil, fmt.Errorf("must be a trace path expression") + } + + return newPathGetSetter(val.Path.Fields) +} + +func newPathGetSetter(path []common.Field) (getSetter, error) { + var getter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} + var setter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) + switch path[0].Name { + case "resource": + mapKey := path[1].MapKey + if mapKey == nil { + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return resource.Attributes() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if attrs, ok := val.(pdata.AttributeMap); ok { + resource.Attributes().Clear() + attrs.CopyTo(resource.Attributes()) + } + } + } else { + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return getAttr(resource.Attributes(), *mapKey) + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + setAttr(resource.Attributes(), *mapKey, val) + } + } + case "instrumentation_library": + if len(path) == 1 { + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return il + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if newIl, ok := val.(pdata.InstrumentationLibrary); ok { + newIl.CopyTo(il) + } + } + } else { + switch path[1].Name { + case "name": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return il.Name() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + il.SetName(str) + } + } + case "version": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return il.Version() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + il.SetVersion(str) + } + } + } + } + case "trace_id": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.TraceID() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + id, _ := hex.DecodeString(str) + var idArr [16]byte + copy(idArr[:16], id) + span.SetTraceID(pdata.NewTraceID(idArr)) + } + } + case "span_id": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.SpanID() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + id, _ := hex.DecodeString(str) + var idArr [8]byte + copy(idArr[:8], id) + span.SetSpanID(pdata.NewSpanID(idArr)) + } + } + case "trace_state": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.TraceState() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + span.SetTraceState(pdata.TraceState(str)) + } + } + case "parent_span_id": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.ParentSpanID() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + id, _ := hex.DecodeString(str) + var idArr [8]byte + copy(idArr[:8], id) + span.SetParentSpanID(pdata.NewSpanID(idArr)) + } + } + case "name": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Name() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + span.SetName(str) + } + } + case "kind": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Kind() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.SetKind(pdata.SpanKind(i)) + } + } + case "start_time_unix_nano": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.StartTimestamp().AsTime().UnixNano() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.SetStartTimestamp(pdata.NewTimestampFromTime(time.Unix(0, i))) + } + } + case "end_time_unix_nano": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.EndTimestamp().AsTime().UnixNano() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.SetEndTimestamp(pdata.NewTimestampFromTime(time.Unix(0, i))) + } + } + case "attributes": + mapKey := path[0].MapKey + if mapKey == nil { + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Attributes() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if attrs, ok := val.(pdata.AttributeMap); ok { + span.Attributes().Clear() + attrs.CopyTo(span.Attributes()) + } + } + } else { + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return getAttr(span.Attributes(), *mapKey) + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + setAttr(span.Attributes(), *mapKey, val) + } + } + case "dropped_attributes_count": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.DroppedAttributesCount() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.SetDroppedAttributesCount(uint32(i)) + } + } + case "events": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Events() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if slc, ok := val.(pdata.SpanEventSlice); ok { + span.Events().RemoveIf(func(event pdata.SpanEvent) bool { + return true + }) + slc.CopyTo(span.Events()) + } + } + case "dropped_events_count": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.DroppedEventsCount() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.SetDroppedEventsCount(uint32(i)) + } + } + case "links": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Links() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if slc, ok := val.(pdata.SpanLinkSlice); ok { + span.Links().RemoveIf(func(event pdata.SpanLink) bool { + return true + }) + slc.CopyTo(span.Links()) + } + } + case "dropped_links_count": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.DroppedLinksCount() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.SetDroppedLinksCount(uint32(i)) + } + } + case "status": + if len(path) == 1 { + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Status() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if status, ok := val.(pdata.SpanStatus); ok { + status.CopyTo(span.Status()) + } + } + } else { + switch path[1].Name { + case "code": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Status().Code() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.Status().SetCode(pdata.StatusCode(i)) + } + } + case "message": + getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Status().Message() + } + setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + span.Status().SetMessage(str) + } + } + } + } + default: + return nil, fmt.Errorf("invalid path expression, unrecognized field %v", path[0].Name) + } + + return pathGetSetter{ + getter: getter, + setter: setter, + }, nil +} + +func getAttr(attrs pdata.AttributeMap, mapKey string) interface{} { + val, ok := attrs.Get(mapKey) + if !ok { + return nil + } + switch val.Type() { + case pdata.AttributeValueTypeString: + return val.StringVal() + case pdata.AttributeValueTypeBool: + return val.BoolVal() + case pdata.AttributeValueTypeInt: + return val.IntVal() + case pdata.AttributeValueTypeDouble: + return val.DoubleVal() + case pdata.AttributeValueTypeMap: + return val.MapVal() + case pdata.AttributeValueTypeArray: + return val.SliceVal() + case pdata.AttributeValueTypeBytes: + return val.BytesVal() + } + return nil +} + +func setAttr(attrs pdata.AttributeMap, mapKey string, val interface{}) { + switch v := val.(type) { + case string: + attrs.UpsertString(mapKey, v) + case bool: + attrs.UpsertBool(mapKey, v) + case int64: + attrs.UpsertInt(mapKey, v) + case float64: + attrs.UpsertDouble(mapKey, v) + case []byte: + attrs.UpsertBytes(mapKey, v) + case []string: + arr := pdata.NewAttributeValueArray() + for _, str := range v { + arr.SliceVal().AppendEmpty().SetStringVal(str) + } + attrs.Upsert(mapKey, arr) + case []bool: + arr := pdata.NewAttributeValueArray() + for _, b := range v { + arr.SliceVal().AppendEmpty().SetBoolVal(b) + } + attrs.Upsert(mapKey, arr) + case []int64: + arr := pdata.NewAttributeValueArray() + for _, i := range v { + arr.SliceVal().AppendEmpty().SetIntVal(i) + } + attrs.Upsert(mapKey, arr) + case []float64: + arr := pdata.NewAttributeValueArray() + for _, f := range v { + arr.SliceVal().AppendEmpty().SetDoubleVal(f) + } + attrs.Upsert(mapKey, arr) + case [][]byte: + arr := pdata.NewAttributeValueArray() + for _, b := range v { + arr.SliceVal().AppendEmpty().SetBytesVal(b) + } + attrs.Upsert(mapKey, arr) + default: + // TODO(anuraaga): Support set of map type. + } +} diff --git a/processor/transformprocessor/internal/traces/traces_test.go b/processor/transformprocessor/internal/traces/traces_test.go new file mode 100644 index 000000000000..e225e0f1f555 --- /dev/null +++ b/processor/transformprocessor/internal/traces/traces_test.go @@ -0,0 +1,544 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package traces + +import ( + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +) + +var ( + traceID = [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + traceID2 = [16]byte{16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1} + spanID = [8]byte{1, 2, 3, 4, 5, 6, 7, 8} + spanID2 = [8]byte{8, 7, 6, 5, 4, 3, 2, 1} +) + +func Test_newPathGetSetter(t *testing.T) { + refSpan, _, _ := createTelemetry() + + newAttrs := pdata.NewAttributeMap() + newAttrs.UpsertString("hello", "world") + + newEvents := pdata.NewSpanEventSlice() + newEvents.AppendEmpty().SetName("new event") + + newLinks := pdata.NewSpanLinkSlice() + newLinks.AppendEmpty().SetSpanID(pdata.NewSpanID(spanID2)) + + newStatus := pdata.NewSpanStatus() + newStatus.SetMessage("new status") + + newArrStr := pdata.NewAttributeValueArray() + newArrStr.SliceVal().AppendEmpty().SetStringVal("new") + + newArrBool := pdata.NewAttributeValueArray() + newArrBool.SliceVal().AppendEmpty().SetBoolVal(false) + + newArrInt := pdata.NewAttributeValueArray() + newArrInt.SliceVal().AppendEmpty().SetIntVal(20) + + newArrFloat := pdata.NewAttributeValueArray() + newArrFloat.SliceVal().AppendEmpty().SetDoubleVal(2.0) + + newArrBytes := pdata.NewAttributeValueArray() + newArrBytes.SliceVal().AppendEmpty().SetBytesVal([]byte{9, 6, 4}) + + tests := []struct { + name string + path []common.Field + orig interface{} + new interface{} + modified func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) + }{ + { + name: "trace_id", + path: []common.Field{ + { + Name: "trace_id", + }, + }, + orig: pdata.NewTraceID(traceID), + new: hex.EncodeToString(traceID2[:]), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetTraceID(pdata.NewTraceID(traceID2)) + }, + }, + { + name: "span_id", + path: []common.Field{ + { + Name: "span_id", + }, + }, + orig: pdata.NewSpanID(spanID), + new: hex.EncodeToString(spanID2[:]), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetSpanID(pdata.NewSpanID(spanID2)) + }, + }, + { + name: "trace_state", + path: []common.Field{ + { + Name: "trace_state", + }, + }, + orig: pdata.TraceState("state"), + new: "newstate", + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetTraceState("newstate") + }, + }, + { + name: "parent_span_id", + path: []common.Field{ + { + Name: "parent_span_id", + }, + }, + orig: pdata.NewSpanID(spanID2), + new: hex.EncodeToString(spanID[:]), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetParentSpanID(pdata.NewSpanID(spanID)) + }, + }, + { + name: "name", + path: []common.Field{ + { + Name: "name", + }, + }, + orig: "bear", + new: "cat", + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetName("cat") + }, + }, + { + name: "kind", + path: []common.Field{ + { + Name: "kind", + }, + }, + orig: pdata.SpanKindServer, + new: int64(3), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetKind(pdata.SpanKindClient) + }, + }, + { + name: "start_time_unix_nano", + path: []common.Field{ + { + Name: "start_time_unix_nano", + }, + }, + orig: int64(100_000_000), + new: int64(200_000_000), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetStartTimestamp(pdata.NewTimestampFromTime(time.UnixMilli(200))) + }, + }, + { + name: "end_time_unix_nano", + path: []common.Field{ + { + Name: "end_time_unix_nano", + }, + }, + orig: int64(500_000_000), + new: int64(200_000_000), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetEndTimestamp(pdata.NewTimestampFromTime(time.UnixMilli(200))) + }, + }, + { + name: "attributes", + path: []common.Field{ + { + Name: "attributes", + }, + }, + orig: refSpan.Attributes(), + new: newAttrs, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().Clear() + newAttrs.CopyTo(span.Attributes()) + }, + }, + { + name: "attributes string", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("str"), + }, + }, + orig: "val", + new: "newVal", + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().UpsertString("str", "newVal") + }, + }, + { + name: "attributes bool", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("bool"), + }, + }, + orig: true, + new: false, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().UpsertBool("bool", false) + }, + }, + { + name: "attributes int", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("int"), + }, + }, + orig: int64(10), + new: int64(20), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().UpsertInt("int", 20) + }, + }, + { + name: "attributes float", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("double"), + }, + }, + orig: float64(1.2), + new: float64(2.4), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().UpsertDouble("double", 2.4) + }, + }, + { + name: "attributes bytes", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("bytes"), + }, + }, + orig: []byte{1, 3, 2}, + new: []byte{2, 3, 4}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().UpsertBytes("bytes", []byte{2, 3, 4}) + }, + }, + { + name: "attributes array string", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("arr_str"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_str") + return val.SliceVal() + }(), + new: []string{"new"}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().Upsert("arr_str", newArrStr) + }, + }, + { + name: "attributes array bool", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("arr_bool"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_bool") + return val.SliceVal() + }(), + new: []bool{false}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().Upsert("arr_bool", newArrBool) + }, + }, + { + name: "attributes array int", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("arr_int"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_int") + return val.SliceVal() + }(), + new: []int64{20}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().Upsert("arr_int", newArrInt) + }, + }, + { + name: "attributes array float", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("arr_float"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_float") + return val.SliceVal() + }(), + new: []float64{2.0}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().Upsert("arr_float", newArrFloat) + }, + }, + { + name: "attributes array bytes", + path: []common.Field{ + { + Name: "attributes", + MapKey: strp("arr_bytes"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_bytes") + return val.SliceVal() + }(), + new: [][]byte{{9, 6, 4}}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Attributes().Upsert("arr_bytes", newArrBytes) + }, + }, + { + name: "dropped_attributes_count", + path: []common.Field{ + { + Name: "dropped_attributes_count", + }, + }, + orig: uint32(10), + new: int64(20), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetDroppedAttributesCount(20) + }, + }, + { + name: "events", + path: []common.Field{ + { + Name: "events", + }, + }, + orig: refSpan.Events(), + new: newEvents, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Events().RemoveIf(func(_ pdata.SpanEvent) bool { + return true + }) + newEvents.CopyTo(span.Events()) + }, + }, + { + name: "dropped_events_count", + path: []common.Field{ + { + Name: "dropped_events_count", + }, + }, + orig: uint32(20), + new: int64(30), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetDroppedEventsCount(30) + }, + }, + { + name: "links", + path: []common.Field{ + { + Name: "links", + }, + }, + orig: refSpan.Links(), + new: newLinks, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Links().RemoveIf(func(_ pdata.SpanLink) bool { + return true + }) + newLinks.CopyTo(span.Links()) + }, + }, + { + name: "dropped_links_count", + path: []common.Field{ + { + Name: "dropped_links_count", + }, + }, + orig: uint32(30), + new: int64(40), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.SetDroppedLinksCount(40) + }, + }, + { + name: "status", + path: []common.Field{ + { + Name: "status", + }, + }, + orig: refSpan.Status(), + new: newStatus, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + newStatus.CopyTo(span.Status()) + }, + }, + { + name: "status code", + path: []common.Field{ + { + Name: "status", + }, + { + Name: "code", + }, + }, + orig: pdata.StatusCodeOk, + new: int64(pdata.StatusCodeError), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Status().SetCode(pdata.StatusCodeError) + }, + }, + { + name: "status message", + path: []common.Field{ + { + Name: "status", + }, + { + Name: "message", + }, + }, + orig: "good span", + new: "bad span", + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + span.Status().SetMessage("bad span") + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + accessor, err := newPathGetSetter(tt.path) + assert.NoError(t, err) + + span, il, resource := createTelemetry() + + got := accessor.get(span, il, resource) + assert.Equal(t, tt.orig, got) + + accessor.set(span, il, resource, tt.new) + + exSpan, exIl, exRes := createTelemetry() + tt.modified(exSpan, exIl, exRes) + + assert.Equal(t, exSpan, span) + assert.Equal(t, exIl, il) + assert.Equal(t, exRes, resource) + }) + } +} + +func createTelemetry() (pdata.Span, pdata.InstrumentationLibrary, pdata.Resource) { + span := pdata.NewSpan() + span.SetTraceID(pdata.NewTraceID(traceID)) + span.SetSpanID(pdata.NewSpanID(spanID)) + span.SetTraceState("state") + span.SetParentSpanID(pdata.NewSpanID(spanID2)) + span.SetName("bear") + span.SetKind(pdata.SpanKindServer) + span.SetStartTimestamp(pdata.NewTimestampFromTime(time.UnixMilli(100))) + span.SetEndTimestamp(pdata.NewTimestampFromTime(time.UnixMilli(500))) + span.Attributes().UpsertString("str", "val") + span.Attributes().UpsertBool("bool", true) + span.Attributes().UpsertInt("int", 10) + span.Attributes().UpsertDouble("double", 1.2) + span.Attributes().UpsertBytes("bytes", []byte{1, 3, 2}) + + arrStr := pdata.NewAttributeValueArray() + arrStr.SliceVal().AppendEmpty().SetStringVal("one") + arrStr.SliceVal().AppendEmpty().SetStringVal("two") + span.Attributes().Upsert("arr_str", arrStr) + + arrBool := pdata.NewAttributeValueArray() + arrBool.SliceVal().AppendEmpty().SetBoolVal(true) + arrBool.SliceVal().AppendEmpty().SetBoolVal(false) + span.Attributes().Upsert("arr_bool", arrBool) + + arrInt := pdata.NewAttributeValueArray() + arrInt.SliceVal().AppendEmpty().SetIntVal(2) + arrInt.SliceVal().AppendEmpty().SetIntVal(3) + span.Attributes().Upsert("arr_int", arrInt) + + arrFloat := pdata.NewAttributeValueArray() + arrFloat.SliceVal().AppendEmpty().SetDoubleVal(1.0) + arrFloat.SliceVal().AppendEmpty().SetDoubleVal(2.0) + span.Attributes().Upsert("arr_float", arrFloat) + + arrBytes := pdata.NewAttributeValueArray() + arrBytes.SliceVal().AppendEmpty().SetBytesVal([]byte{1, 2, 3}) + arrBytes.SliceVal().AppendEmpty().SetBytesVal([]byte{2, 3, 4}) + span.Attributes().Upsert("arr_bytes", arrBytes) + + span.SetDroppedAttributesCount(10) + + span.Events().AppendEmpty().SetName("event") + span.SetDroppedEventsCount(20) + + span.Links().AppendEmpty().SetTraceID(pdata.NewTraceID(traceID)) + span.SetDroppedLinksCount(30) + + span.Status().SetCode(pdata.StatusCodeOk) + span.Status().SetMessage("good span") + + il := pdata.NewInstrumentationLibrary() + il.SetName("library") + il.SetVersion("version") + + resource := pdata.NewResource() + span.Attributes().CopyTo(resource.Attributes()) + + return span, il, resource +} From d234f06359955ee2474c99e6041a18921757f47e Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Mon, 24 Jan 2022 06:28:51 +0000 Subject: [PATCH 02/16] Global parser --- .../internal/common/parser.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/processor/transformprocessor/internal/common/parser.go b/processor/transformprocessor/internal/common/parser.go index f4b7db0b67d2..4626082524d3 100644 --- a/processor/transformprocessor/internal/common/parser.go +++ b/processor/transformprocessor/internal/common/parser.go @@ -65,14 +65,11 @@ type Field struct { MapKey *string `( "[" @String "]" )?` } -func Parse(raw string) (*ParsedQuery, error) { - parser, err := newParser() - if err != nil { - return &ParsedQuery{}, err - } +var parser = newParser() +func Parse(raw string) (*ParsedQuery, error) { parsed := &ParsedQuery{} - err = parser.ParseString("", raw, parsed) + err := parser.ParseString("", raw, parsed) if err != nil { return nil, err } @@ -81,7 +78,7 @@ func Parse(raw string) (*ParsedQuery, error) { // newParser returns a parser that can be used to read a string into a ParsedQuery. An error will be returned if the string // is not formatted for the DSL. -func newParser() (*participle.Parser, error) { +func newParser() *participle.Parser { lex := lexer.MustSimple([]lexer.Rule{ {Name: `Ident`, Pattern: `[a-zA-Z_][a-zA-Z0-9_]*`, Action: nil}, {Name: `Float`, Pattern: `[-+]?\d*\.\d+([eE][-+]?\d+)?`, Action: nil}, @@ -90,9 +87,13 @@ func newParser() (*participle.Parser, error) { {Name: `Operators`, Pattern: `==|!=|[,.()\[\]]`, Action: nil}, {Name: "whitespace", Pattern: `\s+`, Action: nil}, }) - return participle.Build(&ParsedQuery{}, + parser, err := participle.Build(&ParsedQuery{}, participle.Lexer(lex), participle.Unquote("String"), participle.Elide("whitespace"), ) + if err != nil { + panic("Unable to initialize parser, this is a programming error in the transformprocesor") + } + return parser } From a7783358d8133db6e5198e39bbdfb32d268b620d Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Mon, 24 Jan 2022 06:38:23 +0000 Subject: [PATCH 03/16] func aliases --- processor/transformprocessor/internal/traces/condition.go | 4 +++- processor/transformprocessor/internal/traces/expression.go | 2 ++ .../transformprocessor/internal/traces/expression_test.go | 2 +- processor/transformprocessor/internal/traces/functions.go | 4 ++-- processor/transformprocessor/internal/traces/processor.go | 4 ++-- processor/transformprocessor/internal/traces/registry.go | 6 ++---- processor/transformprocessor/internal/traces/traces.go | 4 ++-- 7 files changed, 14 insertions(+), 12 deletions(-) diff --git a/processor/transformprocessor/internal/traces/condition.go b/processor/transformprocessor/internal/traces/condition.go index e7d2a44ffb62..24d97300305b 100644 --- a/processor/transformprocessor/internal/traces/condition.go +++ b/processor/transformprocessor/internal/traces/condition.go @@ -22,7 +22,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func newConditionEvaluator(cond *common.Condition) (func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool, error) { +type condFunc = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool + +func newConditionEvaluator(cond *common.Condition) (condFunc, error) { if cond == nil { return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { return true diff --git a/processor/transformprocessor/internal/traces/expression.go b/processor/transformprocessor/internal/traces/expression.go index 08a1e4c97ebc..1e625c0ff112 100644 --- a/processor/transformprocessor/internal/traces/expression.go +++ b/processor/transformprocessor/internal/traces/expression.go @@ -22,6 +22,8 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type exprFunc func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} + // getter allows reading a value while processing traces. Note that data is not necessarily read from input // telemetry but may be a literal value or a function invocation. type getter interface { diff --git a/processor/transformprocessor/internal/traces/expression_test.go b/processor/transformprocessor/internal/traces/expression_test.go index b91ad10ba86f..1fa48a81182c 100644 --- a/processor/transformprocessor/internal/traces/expression_test.go +++ b/processor/transformprocessor/internal/traces/expression_test.go @@ -23,7 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) -func hello() func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { +func hello() exprFunc { return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return "world" } diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index 74ad33e6f542..4786a705a939 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -18,7 +18,7 @@ import ( "go.opentelemetry.io/collector/model/pdata" ) -func set(target getSetter, value getter) func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { +func set(target getSetter, value getter) exprFunc { return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { val := value.get(span, il, resource) if val != nil { @@ -28,7 +28,7 @@ func set(target getSetter, value getter) func(span pdata.Span, il pdata.Instrume } } -func keep(target getSetter, keys []string) func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { +func keep(target getSetter, keys []string) exprFunc { keySet := make(map[string]struct{}, len(keys)) for _, key := range keys { keySet[key] = struct{}{} diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index eee8f991ec4e..46b425c0cddb 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -32,8 +32,8 @@ type Processor struct { // Query holds a top level Query for processing trace data. A Query is a combination of a function // invocation and the condition to match telemetry for invoking the function. type Query struct { - function func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} - condition func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool + function exprFunc + condition condFunc } func NewProcessor(statements []Query, settings component.ProcessorCreateSettings) (*Processor, error) { diff --git a/processor/transformprocessor/internal/traces/registry.go b/processor/transformprocessor/internal/traces/registry.go index 8efd52a10024..69738756a3d1 100644 --- a/processor/transformprocessor/internal/traces/registry.go +++ b/processor/transformprocessor/internal/traces/registry.go @@ -18,14 +18,12 @@ import ( "fmt" "reflect" - "go.opentelemetry.io/collector/model/pdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) var registry = make(map[string]interface{}) -func newFunctionCall(inv common.Invocation) (func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{}, error) { +func newFunctionCall(inv common.Invocation) (exprFunc, error) { if f, ok := registry[inv.Function]; ok { fType := reflect.TypeOf(f) args := make([]reflect.Value, 0) @@ -72,7 +70,7 @@ func newFunctionCall(inv common.Invocation) (func(span pdata.Span, il pdata.Inst } val := reflect.ValueOf(f) ret := val.Call(args) - return ret[0].Interface().(func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{}), nil + return ret[0].Interface().(exprFunc), nil } return nil, fmt.Errorf("undefined function %v", inv.Function) } diff --git a/processor/transformprocessor/internal/traces/traces.go b/processor/transformprocessor/internal/traces/traces.go index e842abc91c47..235043d724d5 100644 --- a/processor/transformprocessor/internal/traces/traces.go +++ b/processor/transformprocessor/internal/traces/traces.go @@ -26,7 +26,7 @@ import ( // pathGetSetter is a getSetter which has been resolved using a path expression provided by a user. type pathGetSetter struct { - getter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} + getter exprFunc setter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) } @@ -47,7 +47,7 @@ func newGetSetter(val common.Value) (getSetter, error) { } func newPathGetSetter(path []common.Field) (getSetter, error) { - var getter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} + var getter exprFunc var setter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) switch path[0].Name { case "resource": From e4273ee5c1449d29b47576af0ced1b02a9d63332 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 27 Jan 2022 07:09:08 +0000 Subject: [PATCH 04/16] Start cleanups --- .../internal/traces/condition.go | 13 ++++++++---- .../internal/traces/expression.go | 20 ++++++++----------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/processor/transformprocessor/internal/traces/condition.go b/processor/transformprocessor/internal/traces/condition.go index 24d97300305b..a6083297f765 100644 --- a/processor/transformprocessor/internal/traces/condition.go +++ b/processor/transformprocessor/internal/traces/condition.go @@ -24,17 +24,20 @@ import ( type condFunc = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool +var alwaysTrue = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { + return true +} + func newConditionEvaluator(cond *common.Condition) (condFunc, error) { if cond == nil { - return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { - return true - }, nil + return alwaysTrue, nil } left, err := newGetter(cond.Left) if err != nil { return nil, err } right, err := newGetter(cond.Right) + // TODO(anuraaga): Check if both left and right are literals and const-evaluate if err != nil { return nil, err } @@ -48,7 +51,9 @@ func newConditionEvaluator(cond *common.Condition) (condFunc, error) { }, nil case "!=": return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) bool { - return left.get(span, il, resource) != right.get(span, il, resource) + a := left.get(span, il, resource) + b := right.get(span, il, resource) + return a != b }, nil } diff --git a/processor/transformprocessor/internal/traces/expression.go b/processor/transformprocessor/internal/traces/expression.go index 1e625c0ff112..f621b09d2fa0 100644 --- a/processor/transformprocessor/internal/traces/expression.go +++ b/processor/transformprocessor/internal/traces/expression.go @@ -41,23 +41,19 @@ type literal struct { value interface{} } -func (l literal) get(_ pdata.Span, _ pdata.InstrumentationLibrary, _ pdata.Resource) interface{} { +func (l literal) get(pdata.Span, pdata.InstrumentationLibrary, pdata.Resource) interface{} { return l.value } func newGetter(val common.Value) (getter, error) { if s := val.String; s != nil { - return &literal{ - value: *s, - }, nil - } else if f := val.Float; f != nil { - return &literal{ - value: *f, - }, nil - } else if i := val.Int; i != nil { - return &literal{ - value: *i, - }, nil + return &literal{value: *s}, nil + } + if f := val.Float; f != nil { + return &literal{value: *f}, nil + } + if i := val.Int; i != nil { + return &literal{value: *i}, nil } if val.Path != nil { From cc83968b0102b72c0add054846ed03ebfbed6700 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Tue, 1 Feb 2022 08:26:31 +0000 Subject: [PATCH 05/16] Cleanups --- processor/transformprocessor/config.go | 4 +++- processor/transformprocessor/config_test.go | 22 +++++++++---------- processor/transformprocessor/factory.go | 19 ++++++++++------ processor/transformprocessor/factory_test.go | 4 +++- processor/transformprocessor/go.mod | 2 +- .../internal/traces/functions.go | 2 ++ .../internal/traces/processor.go | 8 ++----- .../internal/traces/processor_test.go | 10 +++++++-- 8 files changed, 42 insertions(+), 29 deletions(-) diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 288e7089e2e3..7d5532e9644f 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -16,10 +16,12 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co import ( "go.opentelemetry.io/collector/config" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" ) type TracesConfig struct { - Queries []string `mapstructure:"queries"` + Queries []traces.Query `mapstructure:"queries"` } type Config struct { diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index cc9959ea6e92..bff8e3be5d5c 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/service/servicetest" ) @@ -35,14 +34,15 @@ func TestLoadingConfig(t *testing.T) { assert.NoError(t, err) require.NotNil(t, cfg) - p0 := cfg.Processors[config.NewComponentID(typeStr)] - assert.Equal(t, p0, &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), - Traces: TracesConfig{ - Queries: []string{ - `set(name, "bear") where attributes["http.path"] == "/animal"`, - `keep(attributes, "http.method", "http.path")`, - }, - }, - }) + // TODO(anuraaga): Fix + // p0 := cfg.Processors[config.NewComponentID(typeStr)] + //assert.Equal(t, p0, &Config{ + // ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), + // Traces: TracesConfig{ + // Queries: []string{ + // `set(name, "bear") where attributes["http.path"] == "/animal"`, + // `keep(attributes, "http.method", "http.path")`, + // }, + // }, + //}) } diff --git a/processor/transformprocessor/factory.go b/processor/transformprocessor/factory.go index f7d76a5f65de..5da02f013037 100644 --- a/processor/transformprocessor/factory.go +++ b/processor/transformprocessor/factory.go @@ -16,12 +16,14 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/model/pdata" "go.opentelemetry.io/collector/processor/processorhelper" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" ) const ( @@ -42,23 +44,26 @@ func createDefaultConfig() config.Processor { return &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), Traces: TracesConfig{ - Queries: []string{}, + Queries: []traces.Query{}, }, } } func createTracesProcessor( _ context.Context, - _ component.ProcessorCreateSettings, + settings component.ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Traces, ) (component.TracesProcessor, error) { + oCfg := cfg.(*Config) + + proc, err := traces.NewProcessor(oCfg.Traces.Queries, settings) + if err != nil { + return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) + } return processorhelper.NewTracesProcessor( cfg, nextConsumer, - // TODO(anuraaga): Replace with business logic. - func(ctx context.Context, p pdata.Traces) (pdata.Traces, error) { - return p, nil - }, + proc.ProcessTraces, processorhelper.WithCapabilities(processorCapabilities)) } diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index 33c78e9181d5..9f5fa49ee937 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -20,6 +20,8 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" ) func TestFactory_Type(t *testing.T) { @@ -33,7 +35,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { assert.Equal(t, cfg, &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), Traces: TracesConfig{ - Queries: []string{}, + Queries: []traces.Query{}, }, }) assert.NoError(t, configtest.CheckConfigStruct(cfg)) diff --git a/processor/transformprocessor/go.mod b/processor/transformprocessor/go.mod index d95f38fd11a1..f0d6d2b9af04 100644 --- a/processor/transformprocessor/go.mod +++ b/processor/transformprocessor/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.7.0 go.opentelemetry.io/collector v0.41.1-0.20211222180302-3db1d1146483 go.opentelemetry.io/collector/model v0.41.1-0.20211222180302-3db1d1146483 + go.uber.org/zap v1.19.1 ) require ( @@ -30,7 +31,6 @@ require ( go.opentelemetry.io/otel/trace v1.3.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect - go.uber.org/zap v1.19.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index 4786a705a939..fbeeaf092547 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -41,7 +41,9 @@ func keep(target getSetter, keys []string) exprFunc { } if attrs, ok := val.(pdata.AttributeMap); ok { + // TODO(anuraaga): Avoid copying when filtering keys https://github.com/open-telemetry/opentelemetry-collector/issues/4756 filtered := pdata.NewAttributeMap() + filtered.EnsureCapacity(attrs.Len()) attrs.Range(func(key string, val pdata.AttributeValue) bool { if _, ok := keySet[key]; ok { filtered.Insert(key, val) diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 46b425c0cddb..730f0770a55f 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -44,11 +44,6 @@ func NewProcessor(statements []Query, settings component.ProcessorCreateSettings } func (p *Processor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) { - process(td, p.statements) - return td, nil -} - -func process(td pdata.Traces, statements []Query) { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.InstrumentationLibrarySpans().Len(); j++ { @@ -57,7 +52,7 @@ func process(td pdata.Traces, statements []Query) { for k := 0; k < spans.Len(); k++ { span := spans.At(k) - for _, statement := range statements { + for _, statement := range p.statements { if statement.condition(span, il, rspans.Resource()) { statement.function(span, il, rspans.Resource()) } @@ -65,6 +60,7 @@ func process(td pdata.Traces, statements []Query) { } } } + return td, nil } func (s *Query) UnmarshalText(text []byte) error { diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 4921ee11c577..94b0288d059d 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -123,10 +123,13 @@ func BenchmarkTwoSpans(b *testing.B) { b.Run(tt.name, func(b *testing.B) { statements, err := parse(tt.queries) assert.NoError(b, err) + processor, err := NewProcessor(statements, component.ProcessorCreateSettings{}) + assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { td := constructTraces() - process(td, statements) + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(b, err) } }) } @@ -164,10 +167,13 @@ func BenchmarkHundredSpans(b *testing.B) { b.Run(tt.name, func(b *testing.B) { statements, err := parse(tt.queries) assert.NoError(b, err) + processor, err := NewProcessor(statements, component.ProcessorCreateSettings{}) + assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { td := constructTracesNum(100) - process(td, statements) + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(b, err) } }) } From e40879134955d7b1ee29186c6009464cedee13a2 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Tue, 1 Feb 2022 08:54:37 +0000 Subject: [PATCH 06/16] Extract methods for trace access --- .../internal/traces/traces.go | 479 ++++++++++++------ 1 file changed, 314 insertions(+), 165 deletions(-) diff --git a/processor/transformprocessor/internal/traces/traces.go b/processor/transformprocessor/internal/traces/traces.go index 235043d724d5..ddec019abf05 100644 --- a/processor/transformprocessor/internal/traces/traces.go +++ b/processor/transformprocessor/internal/traces/traces.go @@ -47,253 +47,402 @@ func newGetSetter(val common.Value) (getSetter, error) { } func newPathGetSetter(path []common.Field) (getSetter, error) { - var getter exprFunc - var setter func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) switch path[0].Name { case "resource": - mapKey := path[1].MapKey - if mapKey == nil { - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return resource.Attributes() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if attrs, ok := val.(pdata.AttributeMap); ok { - resource.Attributes().Clear() - attrs.CopyTo(resource.Attributes()) - } - } - } else { - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return getAttr(resource.Attributes(), *mapKey) - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - setAttr(resource.Attributes(), *mapKey, val) + if len(path) == 1 { + return accessResource(), nil + } + switch path[1].Name { + case "attributes": + mapKey := path[1].MapKey + if mapKey == nil { + return accessResourceAttributes(), nil } + return accessResourceAttributesKey(mapKey), nil } case "instrumentation_library": if len(path) == 1 { - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return il + return accessInstrumentationLibrary(), nil + } + switch path[1].Name { + case "name": + return accessInstrumentationLibraryName(), nil + case "version": + return accessInstrumentationLibraryVersion(), nil + } + case "trace_id": + return accessTraceID(), nil + case "span_id": + return accessSpanID(), nil + case "trace_state": + return accessTraceState(), nil + case "parent_span_id": + return accessParentSpanID(), nil + case "name": + return accessName(), nil + case "kind": + return accessKind(), nil + case "start_time_unix_nano": + return accessStartTimeUnixNano(), nil + case "end_time_unix_nano": + return accessEndTimeUnixNano(), nil + case "attributes": + mapKey := path[0].MapKey + if mapKey == nil { + return accessAttributes(), nil + } + return accessAttributesKey(mapKey), nil + case "dropped_attributes_count": + return accessDroppedAttributesCount(), nil + case "events": + return accessEvents(), nil + case "dropped_events_count": + return accessDroppedEventsCount(), nil + case "links": + return accessLinks(), nil + case "dropped_links_count": + return accessDroppedLinksCount(), nil + case "status": + if len(path) == 1 { + return accessStatus(), nil + } + switch path[1].Name { + case "code": + return accessStatusCode(), nil + case "message": + return accessStatusMessage(), nil + } + default: + return nil, fmt.Errorf("invalid path expression, unrecognized field %v", path[0].Name) + } + + return nil, fmt.Errorf("invalid path expression %v", path) +} + +func accessResource() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return resource + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if newRes, ok := val.(pdata.Resource); ok { + resource.Attributes().Clear() + newRes.CopyTo(resource) } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if newIl, ok := val.(pdata.InstrumentationLibrary); ok { - newIl.CopyTo(il) - } + }, + } +} + +func accessResourceAttributes() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return resource.Attributes() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if attrs, ok := val.(pdata.AttributeMap); ok { + resource.Attributes().Clear() + attrs.CopyTo(resource.Attributes()) } - } else { - switch path[1].Name { - case "name": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return il.Name() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if str, ok := val.(string); ok { - il.SetName(str) - } - } - case "version": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return il.Version() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if str, ok := val.(string); ok { - il.SetVersion(str) - } - } + }, + } +} + +func accessResourceAttributesKey(mapKey *string) pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return getAttr(resource.Attributes(), *mapKey) + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + setAttr(resource.Attributes(), *mapKey, val) + }, + } +} + +func accessInstrumentationLibrary() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return il + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if newIl, ok := val.(pdata.InstrumentationLibrary); ok { + newIl.CopyTo(il) } - } - case "trace_id": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessInstrumentationLibraryName() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return il.Name() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + il.SetName(str) + } + }, + } +} + +func accessInstrumentationLibraryVersion() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return il.Version() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + il.SetVersion(str) + } + }, + } +} + +func accessTraceID() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.TraceID() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if str, ok := val.(string); ok { id, _ := hex.DecodeString(str) var idArr [16]byte copy(idArr[:16], id) span.SetTraceID(pdata.NewTraceID(idArr)) } - } - case "span_id": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessSpanID() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.SpanID() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if str, ok := val.(string); ok { id, _ := hex.DecodeString(str) var idArr [8]byte copy(idArr[:8], id) span.SetSpanID(pdata.NewSpanID(idArr)) } - } - case "trace_state": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessTraceState() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.TraceState() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if str, ok := val.(string); ok { span.SetTraceState(pdata.TraceState(str)) } - } - case "parent_span_id": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessParentSpanID() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.ParentSpanID() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if str, ok := val.(string); ok { id, _ := hex.DecodeString(str) var idArr [8]byte copy(idArr[:8], id) span.SetParentSpanID(pdata.NewSpanID(idArr)) } - } - case "name": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessName() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.Name() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if str, ok := val.(string); ok { span.SetName(str) } - } - case "kind": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessKind() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.Kind() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if i, ok := val.(int64); ok { span.SetKind(pdata.SpanKind(i)) } - } - case "start_time_unix_nano": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessStartTimeUnixNano() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.StartTimestamp().AsTime().UnixNano() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if i, ok := val.(int64); ok { span.SetStartTimestamp(pdata.NewTimestampFromTime(time.Unix(0, i))) } - } - case "end_time_unix_nano": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessEndTimeUnixNano() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.EndTimestamp().AsTime().UnixNano() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if i, ok := val.(int64); ok { span.SetEndTimestamp(pdata.NewTimestampFromTime(time.Unix(0, i))) } - } - case "attributes": - mapKey := path[0].MapKey - if mapKey == nil { - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return span.Attributes() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if attrs, ok := val.(pdata.AttributeMap); ok { - span.Attributes().Clear() - attrs.CopyTo(span.Attributes()) - } - } - } else { - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return getAttr(span.Attributes(), *mapKey) - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - setAttr(span.Attributes(), *mapKey, val) + }, + } +} + +func accessAttributes() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Attributes() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if attrs, ok := val.(pdata.AttributeMap); ok { + span.Attributes().Clear() + attrs.CopyTo(span.Attributes()) } - } - case "dropped_attributes_count": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessAttributesKey(mapKey *string) pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return getAttr(span.Attributes(), *mapKey) + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + setAttr(span.Attributes(), *mapKey, val) + }, + } +} + +func accessDroppedAttributesCount() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.DroppedAttributesCount() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if i, ok := val.(int64); ok { span.SetDroppedAttributesCount(uint32(i)) } - } - case "events": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessEvents() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.Events() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if slc, ok := val.(pdata.SpanEventSlice); ok { span.Events().RemoveIf(func(event pdata.SpanEvent) bool { return true }) slc.CopyTo(span.Events()) } - } - case "dropped_events_count": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessDroppedEventsCount() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.DroppedEventsCount() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if i, ok := val.(int64); ok { span.SetDroppedEventsCount(uint32(i)) } - } - case "links": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessLinks() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.Links() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if slc, ok := val.(pdata.SpanLinkSlice); ok { span.Links().RemoveIf(func(event pdata.SpanLink) bool { return true }) slc.CopyTo(span.Links()) } - } - case "dropped_links_count": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + }, + } +} + +func accessDroppedLinksCount() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { return span.DroppedLinksCount() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { if i, ok := val.(int64); ok { span.SetDroppedLinksCount(uint32(i)) } - } - case "status": - if len(path) == 1 { - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return span.Status() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if status, ok := val.(pdata.SpanStatus); ok { - status.CopyTo(span.Status()) - } + }, + } +} + +func accessStatus() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Status() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if status, ok := val.(pdata.SpanStatus); ok { + status.CopyTo(span.Status()) } - } else { - switch path[1].Name { - case "code": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return span.Status().Code() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if i, ok := val.(int64); ok { - span.Status().SetCode(pdata.StatusCode(i)) - } - } - case "message": - getter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { - return span.Status().Message() - } - setter = func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { - if str, ok := val.(string); ok { - span.Status().SetMessage(str) - } - } + }, + } +} + +func accessStatusCode() pathGetSetter { + return pathGetSetter{ + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Status().Code() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if i, ok := val.(int64); ok { + span.Status().SetCode(pdata.StatusCode(i)) } - } - default: - return nil, fmt.Errorf("invalid path expression, unrecognized field %v", path[0].Name) + }, } +} +func accessStatusMessage() pathGetSetter { return pathGetSetter{ - getter: getter, - setter: setter, - }, nil + getter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { + return span.Status().Message() + }, + setter: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) { + if str, ok := val.(string); ok { + span.Status().SetMessage(str) + } + }, + } } func getAttr(attrs pdata.AttributeMap, mapKey string) interface{} { From 18ee45be9a765740f9547f8616517271699c30e6 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:01:45 +0000 Subject: [PATCH 07/16] Remove register / unregisterFunction --- processor/transformprocessor/config.go | 10 ++- processor/transformprocessor/config_test.go | 26 +++--- processor/transformprocessor/factory.go | 6 +- processor/transformprocessor/factory_test.go | 4 +- .../internal/traces/condition.go | 6 +- .../internal/traces/condition_test.go | 4 +- .../internal/traces/expression.go | 4 +- .../internal/traces/expression_test.go | 9 +- .../internal/traces/functions.go | 67 ++++++++++++++- .../{registry_test.go => functions_test.go} | 4 +- .../internal/traces/processor.go | 53 ++++++++---- .../internal/traces/processor_test.go | 26 +----- .../internal/traces/registry.go | 84 ------------------- 13 files changed, 146 insertions(+), 157 deletions(-) rename processor/transformprocessor/internal/traces/{registry_test.go => functions_test.go} (97%) delete mode 100644 processor/transformprocessor/internal/traces/registry.go diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 7d5532e9644f..e2cf1379c3d7 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -21,7 +21,10 @@ import ( ) type TracesConfig struct { - Queries []traces.Query `mapstructure:"queries"` + Queries []string `mapstructure:"queries"` + + // The functions that have been registered in the extension for traces processing. + functions map[string]interface{} `mapstructure:"-"` } type Config struct { @@ -31,3 +34,8 @@ type Config struct { } var _ config.Processor = (*Config)(nil) + +func (c *Config) Validate() error { + _, err := traces.Parse(c.Traces.Queries, c.Traces.functions) + return err +} diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index bff8e3be5d5c..ce04da33b531 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -21,7 +21,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/service/servicetest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" ) func TestLoadingConfig(t *testing.T) { @@ -34,15 +37,16 @@ func TestLoadingConfig(t *testing.T) { assert.NoError(t, err) require.NotNil(t, cfg) - // TODO(anuraaga): Fix - // p0 := cfg.Processors[config.NewComponentID(typeStr)] - //assert.Equal(t, p0, &Config{ - // ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), - // Traces: TracesConfig{ - // Queries: []string{ - // `set(name, "bear") where attributes["http.path"] == "/animal"`, - // `keep(attributes, "http.method", "http.path")`, - // }, - // }, - //}) + p0 := cfg.Processors[config.NewComponentID(typeStr)] + assert.Equal(t, p0, &Config{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), + Traces: TracesConfig{ + Queries: []string{ + `set(name, "bear") where attributes["http.path"] == "/animal"`, + `keep(attributes, "http.method", "http.path")`, + }, + + functions: traces.DefaultFunctions(), + }, + }) } diff --git a/processor/transformprocessor/factory.go b/processor/transformprocessor/factory.go index 5da02f013037..728f62fa3f06 100644 --- a/processor/transformprocessor/factory.go +++ b/processor/transformprocessor/factory.go @@ -44,7 +44,9 @@ func createDefaultConfig() config.Processor { return &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), Traces: TracesConfig{ - Queries: []traces.Query{}, + Queries: []string{}, + + functions: traces.DefaultFunctions(), }, } } @@ -57,7 +59,7 @@ func createTracesProcessor( ) (component.TracesProcessor, error) { oCfg := cfg.(*Config) - proc, err := traces.NewProcessor(oCfg.Traces.Queries, settings) + proc, err := traces.NewProcessor(oCfg.Traces.Queries, oCfg.Traces.functions, settings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index 9f5fa49ee937..ffb43d96cab7 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -35,7 +35,9 @@ func TestFactory_CreateDefaultConfig(t *testing.T) { assert.Equal(t, cfg, &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), Traces: TracesConfig{ - Queries: []traces.Query{}, + Queries: []string{}, + + functions: traces.DefaultFunctions(), }, }) assert.NoError(t, configtest.CheckConfigStruct(cfg)) diff --git a/processor/transformprocessor/internal/traces/condition.go b/processor/transformprocessor/internal/traces/condition.go index a6083297f765..90733c903379 100644 --- a/processor/transformprocessor/internal/traces/condition.go +++ b/processor/transformprocessor/internal/traces/condition.go @@ -28,15 +28,15 @@ var alwaysTrue = func(span pdata.Span, il pdata.InstrumentationLibrary, resource return true } -func newConditionEvaluator(cond *common.Condition) (condFunc, error) { +func newConditionEvaluator(cond *common.Condition, functions map[string]interface{}) (condFunc, error) { if cond == nil { return alwaysTrue, nil } - left, err := newGetter(cond.Left) + left, err := newGetter(cond.Left, functions) if err != nil { return nil, err } - right, err := newGetter(cond.Right) + right, err := newGetter(cond.Right, functions) // TODO(anuraaga): Check if both left and right are literals and const-evaluate if err != nil { return nil, err diff --git a/processor/transformprocessor/internal/traces/condition_test.go b/processor/transformprocessor/internal/traces/condition_test.go index c69dd90697fc..bbbbdfefb3d7 100644 --- a/processor/transformprocessor/internal/traces/condition_test.go +++ b/processor/transformprocessor/internal/traces/condition_test.go @@ -103,7 +103,7 @@ func Test_newConditionEvaluator(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - evaluate, err := newConditionEvaluator(tt.cond) + evaluate, err := newConditionEvaluator(tt.cond, DefaultFunctions()) assert.NoError(t, err) assert.True(t, evaluate(tt.matching, pdata.NewInstrumentationLibrary(), pdata.NewResource())) }) @@ -118,7 +118,7 @@ func Test_newConditionEvaluator(t *testing.T) { Right: common.Value{ String: strp("cat"), }, - }) + }, DefaultFunctions()) assert.Error(t, err) }) } diff --git a/processor/transformprocessor/internal/traces/expression.go b/processor/transformprocessor/internal/traces/expression.go index f621b09d2fa0..71b889355e49 100644 --- a/processor/transformprocessor/internal/traces/expression.go +++ b/processor/transformprocessor/internal/traces/expression.go @@ -45,7 +45,7 @@ func (l literal) get(pdata.Span, pdata.InstrumentationLibrary, pdata.Resource) i return l.value } -func newGetter(val common.Value) (getter, error) { +func newGetter(val common.Value, functions map[string]interface{}) (getter, error) { if s := val.String; s != nil { return &literal{value: *s}, nil } @@ -65,7 +65,7 @@ func newGetter(val common.Value) (getter, error) { return nil, fmt.Errorf("no value field set. This is a bug in the transformprocessor") } - call, err := newFunctionCall(*val.Invocation) + call, err := newFunctionCall(*val.Invocation, functions) if err != nil { return nil, err } diff --git a/processor/transformprocessor/internal/traces/expression_test.go b/processor/transformprocessor/internal/traces/expression_test.go index 1fa48a81182c..5b03c91ded7a 100644 --- a/processor/transformprocessor/internal/traces/expression_test.go +++ b/processor/transformprocessor/internal/traces/expression_test.go @@ -30,9 +30,6 @@ func hello() exprFunc { } func Test_newGetter(t *testing.T) { - registerFunction("hello", hello) - defer unregisterFunction("hello") - span := pdata.NewSpan() span.SetName("bear") tests := []struct { @@ -85,9 +82,11 @@ func Test_newGetter(t *testing.T) { }, } + functions := map[string]interface{}{"hello": hello} + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - reader, err := newGetter(tt.val) + reader, err := newGetter(tt.val, functions) assert.NoError(t, err) val := reader.get(span, pdata.NewInstrumentationLibrary(), pdata.NewResource()) assert.Equal(t, tt.want, val) @@ -95,7 +94,7 @@ func Test_newGetter(t *testing.T) { } t.Run("empty value", func(t *testing.T) { - _, err := newGetter(common.Value{}) + _, err := newGetter(common.Value{}, functions) assert.Error(t, err) }) } diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index fbeeaf092547..9d60682a714f 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -15,9 +15,23 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" import ( + "fmt" + "reflect" + "go.opentelemetry.io/collector/model/pdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +var registry = map[string]interface{}{ + "keep": keep, + "set": set, +} + +func DefaultFunctions() map[string]interface{} { + return registry +} + func set(target getSetter, value getter) exprFunc { return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { val := value.get(span, il, resource) @@ -56,7 +70,54 @@ func keep(target getSetter, keys []string) exprFunc { } } -func init() { - registerFunction("keep", keep) - registerFunction("set", set) +func newFunctionCall(inv common.Invocation, functions map[string]interface{}) (exprFunc, error) { + if f, ok := functions[inv.Function]; ok { + fType := reflect.TypeOf(f) + args := make([]reflect.Value, 0) + for i := 0; i < fType.NumIn(); i++ { + argType := fType.In(i) + + if argType.Kind() == reflect.Slice { + switch argType.Elem().Kind() { + case reflect.String: + arg := make([]string, 0) + for j := i; j < len(inv.Arguments); j++ { + if inv.Arguments[j].String == nil { + return nil, fmt.Errorf("invalid argument for slice parameter at position %v, must be string", j) + } + arg = append(arg, *inv.Arguments[j].String) + } + args = append(args, reflect.ValueOf(arg)) + default: + return nil, fmt.Errorf("unsupported slice type for function %v", inv.Function) + } + continue + } + + if i >= len(inv.Arguments) { + return nil, fmt.Errorf("not enough arguments for function %v", inv.Function) + } + argDef := inv.Arguments[i] + switch argType.Name() { + case "getSetter": + arg, err := newGetSetter(argDef) + if err != nil { + return nil, fmt.Errorf("invalid argument at position %v %w", i, err) + } + args = append(args, reflect.ValueOf(arg)) + continue + case "getter": + arg, err := newGetter(argDef, functions) + if err != nil { + return nil, fmt.Errorf("invalid argument at position %v %w", i, err) + } + args = append(args, reflect.ValueOf(arg)) + continue + } + } + val := reflect.ValueOf(f) + ret := val.Call(args) + return ret[0].Interface().(exprFunc), nil + } + return nil, fmt.Errorf("undefined function %v", inv.Function) } diff --git a/processor/transformprocessor/internal/traces/registry_test.go b/processor/transformprocessor/internal/traces/functions_test.go similarity index 97% rename from processor/transformprocessor/internal/traces/registry_test.go rename to processor/transformprocessor/internal/traces/functions_test.go index b7706f8617f1..572a654533dd 100644 --- a/processor/transformprocessor/internal/traces/registry_test.go +++ b/processor/transformprocessor/internal/traces/functions_test.go @@ -173,7 +173,7 @@ func Test_newFunctionCall(t *testing.T) { span := pdata.NewSpan() input.CopyTo(span) - evaluate, err := newFunctionCall(tt.inv) + evaluate, err := newFunctionCall(tt.inv, DefaultFunctions()) assert.NoError(t, err) evaluate(span, pdata.NewInstrumentationLibrary(), pdata.NewResource()) @@ -277,7 +277,7 @@ func Test_newFunctionCall_invalid(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := newFunctionCall(tt.inv) + _, err := newFunctionCall(tt.inv, DefaultFunctions()) assert.Error(t, err) }) } diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 730f0770a55f..dc827c02083d 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" @@ -36,9 +37,13 @@ type Query struct { condition condFunc } -func NewProcessor(statements []Query, settings component.ProcessorCreateSettings) (*Processor, error) { +func NewProcessor(statements []string, functions map[string]interface{}, settings component.ProcessorCreateSettings) (*Processor, error) { + queries, err := Parse(statements, functions) + if err != nil { + return nil, err + } return &Processor{ - statements: statements, + statements: queries, logger: settings.Logger, }, nil } @@ -63,22 +68,34 @@ func (p *Processor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Tra return td, nil } -func (s *Query) UnmarshalText(text []byte) error { - parsed, err := common.Parse(string(text)) - if err != nil { - return err - } - function, err := newFunctionCall(parsed.Invocation) - if err != nil { - return err - } - condition, err := newConditionEvaluator(parsed.Condition) - if err != nil { - return err +func Parse(statements []string, functions map[string]interface{}) ([]Query, error) { + queries := make([]Query, 0) + var errors error + + for _, statement := range statements { + parsed, err := common.Parse(statement) + if err != nil { + errors = multierr.Append(errors, err) + continue + } + function, err := newFunctionCall(parsed.Invocation, functions) + if err != nil { + errors = multierr.Append(errors, err) + continue + } + condition, err := newConditionEvaluator(parsed.Condition, functions) + if err != nil { + errors = multierr.Append(errors, err) + continue + } + queries = append(queries, Query{ + function: function, + condition: condition, + }) } - *s = Query{ - function: function, - condition: condition, + + if errors != nil { + return nil, errors } - return nil + return queries, nil } diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 94b0288d059d..fef216f0272d 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -69,9 +69,7 @@ func TestProcess(t *testing.T) { for _, tt := range tests { t.Run(tt.query, func(t *testing.T) { td := constructTraces() - statements, err := parse([]string{tt.query}) - assert.NoError(t, err) - processor, err := NewProcessor(statements, component.ProcessorCreateSettings{}) + processor, err := NewProcessor([]string{tt.query}, DefaultFunctions(), component.ProcessorCreateSettings{}) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -121,9 +119,7 @@ func BenchmarkTwoSpans(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - statements, err := parse(tt.queries) - assert.NoError(b, err) - processor, err := NewProcessor(statements, component.ProcessorCreateSettings{}) + processor, err := NewProcessor(tt.queries, DefaultFunctions(), component.ProcessorCreateSettings{}) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -165,9 +161,7 @@ func BenchmarkHundredSpans(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - statements, err := parse(tt.queries) - assert.NoError(b, err) - processor, err := NewProcessor(statements, component.ProcessorCreateSettings{}) + processor, err := NewProcessor(tt.queries, DefaultFunctions(), component.ProcessorCreateSettings{}) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -228,17 +222,3 @@ func fillSpanTwo(span pdata.Span) { status.SetCode(pdata.StatusCodeError) status.SetMessage("status-cancelled") } - -func parse(queries []string) ([]Query, error) { - statements := make([]Query, 0) - for _, q := range queries { - statement := Query{} - err := statement.UnmarshalText([]byte(q)) - if err != nil { - return nil, err - } - statements = append(statements, statement) - } - - return statements, nil -} diff --git a/processor/transformprocessor/internal/traces/registry.go b/processor/transformprocessor/internal/traces/registry.go deleted file mode 100644 index 69738756a3d1..000000000000 --- a/processor/transformprocessor/internal/traces/registry.go +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" - -import ( - "fmt" - "reflect" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" -) - -var registry = make(map[string]interface{}) - -func newFunctionCall(inv common.Invocation) (exprFunc, error) { - if f, ok := registry[inv.Function]; ok { - fType := reflect.TypeOf(f) - args := make([]reflect.Value, 0) - for i := 0; i < fType.NumIn(); i++ { - argType := fType.In(i) - - if argType.Kind() == reflect.Slice { - switch argType.Elem().Kind() { - case reflect.String: - arg := make([]string, 0) - for j := i; j < len(inv.Arguments); j++ { - if inv.Arguments[j].String == nil { - return nil, fmt.Errorf("invalid argument for slice parameter at position %v, must be string", j) - } - arg = append(arg, *inv.Arguments[j].String) - } - args = append(args, reflect.ValueOf(arg)) - default: - return nil, fmt.Errorf("unsupported slice type for function %v", inv.Function) - } - continue - } - - if i >= len(inv.Arguments) { - return nil, fmt.Errorf("not enough arguments for function %v", inv.Function) - } - argDef := inv.Arguments[i] - switch argType.Name() { - case "getSetter": - arg, err := newGetSetter(argDef) - if err != nil { - return nil, fmt.Errorf("invalid argument at position %v %w", i, err) - } - args = append(args, reflect.ValueOf(arg)) - continue - case "getter": - arg, err := newGetter(argDef) - if err != nil { - return nil, fmt.Errorf("invalid argument at position %v %w", i, err) - } - args = append(args, reflect.ValueOf(arg)) - continue - } - } - val := reflect.ValueOf(f) - ret := val.Call(args) - return ret[0].Interface().(exprFunc), nil - } - return nil, fmt.Errorf("undefined function %v", inv.Function) -} - -func registerFunction(name string, fun interface{}) { - registry[name] = fun -} - -func unregisterFunction(name string) { - delete(registry, name) -} From 0f3e6f9fca5c03c399d7aba42fa4fcecda699c04 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:04:40 +0000 Subject: [PATCH 08/16] statement -> query --- .../transformprocessor/internal/traces/processor.go | 10 +++++----- .../internal/traces/processor_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index dc827c02083d..0187c3e2822c 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -26,8 +26,8 @@ import ( ) type Processor struct { - statements []Query - logger *zap.Logger + queries []Query + logger *zap.Logger } // Query holds a top level Query for processing trace data. A Query is a combination of a function @@ -43,8 +43,8 @@ func NewProcessor(statements []string, functions map[string]interface{}, setting return nil, err } return &Processor{ - statements: queries, - logger: settings.Logger, + queries: queries, + logger: settings.Logger, }, nil } @@ -57,7 +57,7 @@ func (p *Processor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Tra for k := 0; k < spans.Len(); k++ { span := spans.At(k) - for _, statement := range p.statements { + for _, statement := range p.queries { if statement.condition(span, il, rspans.Resource()) { statement.function(span, il, rspans.Resource()) } diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index fef216f0272d..f79e55e8786d 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -148,7 +148,7 @@ func BenchmarkHundredSpans(b *testing.B) { }, }, { - name: "hundred statements", + name: "hundred queries", queries: func() []string { queries := make([]string, 0) queries = append(queries, `set(status.code, 1) where name == "operationA"`) From 98582e2f4caccaf591bf7bc9ed2ddc2b60811ae1 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:08:45 +0000 Subject: [PATCH 09/16] Split out setter --- processor/transformprocessor/internal/traces/expression.go | 7 ++++++- processor/transformprocessor/internal/traces/functions.go | 4 +++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/processor/transformprocessor/internal/traces/expression.go b/processor/transformprocessor/internal/traces/expression.go index 71b889355e49..2936ad98535e 100644 --- a/processor/transformprocessor/internal/traces/expression.go +++ b/processor/transformprocessor/internal/traces/expression.go @@ -30,10 +30,15 @@ type getter interface { get(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} } +// setter allows writing a value to trace data. +type setter interface { + set(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) +} + // getSetter allows reading or writing a value to trace data. type getSetter interface { getter - set(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource, val interface{}) + setter } // literal holds a literal value defined as part of a Query. It does not read from telemetry data. diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index 9d60682a714f..210bcb6eedd3 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -32,7 +32,7 @@ func DefaultFunctions() map[string]interface{} { return registry } -func set(target getSetter, value getter) exprFunc { +func set(target setter, value getter) exprFunc { return func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) interface{} { val := value.get(span, il, resource) if val != nil { @@ -99,6 +99,8 @@ func newFunctionCall(inv common.Invocation, functions map[string]interface{}) (e } argDef := inv.Arguments[i] switch argType.Name() { + case "setter": + fallthrough case "getSetter": arg, err := newGetSetter(argDef) if err != nil { From f88a0bebf44fa1206b051ad354b05d411626da4e Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:09:46 +0000 Subject: [PATCH 10/16] keep -> keep_keys --- processor/transformprocessor/config_test.go | 2 +- .../internal/traces/functions.go | 6 +++--- .../internal/traces/functions_test.go | 14 +++++++------- .../internal/traces/processor_test.go | 10 +++++----- processor/transformprocessor/testdata/config.yaml | 2 +- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index ce04da33b531..0e8afee1c0b8 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -43,7 +43,7 @@ func TestLoadingConfig(t *testing.T) { Traces: TracesConfig{ Queries: []string{ `set(name, "bear") where attributes["http.path"] == "/animal"`, - `keep(attributes, "http.method", "http.path")`, + `keep_keys(attributes, "http.method", "http.path")`, }, functions: traces.DefaultFunctions(), diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index 210bcb6eedd3..a6b2a786843e 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -24,8 +24,8 @@ import ( ) var registry = map[string]interface{}{ - "keep": keep, - "set": set, + "keep_keys": keep_keys, + "set": set, } func DefaultFunctions() map[string]interface{} { @@ -42,7 +42,7 @@ func set(target setter, value getter) exprFunc { } } -func keep(target getSetter, keys []string) exprFunc { +func keep_keys(target getSetter, keys []string) exprFunc { keySet := make(map[string]struct{}, len(keys)) for _, key := range keys { keySet[key] = struct{}{} diff --git a/processor/transformprocessor/internal/traces/functions_test.go b/processor/transformprocessor/internal/traces/functions_test.go index 572a654533dd..79cfadfe0f71 100644 --- a/processor/transformprocessor/internal/traces/functions_test.go +++ b/processor/transformprocessor/internal/traces/functions_test.go @@ -89,9 +89,9 @@ func Test_newFunctionCall(t *testing.T) { }, }, { - name: "keep one", + name: "keep_keys one", inv: common.Invocation{ - Function: "keep", + Function: "keep_keys", Arguments: []common.Value{ { Path: &common.Path{ @@ -116,9 +116,9 @@ func Test_newFunctionCall(t *testing.T) { }, }, { - name: "keep two", + name: "keep_keys two", inv: common.Invocation{ - Function: "keep", + Function: "keep_keys", Arguments: []common.Value{ { Path: &common.Path{ @@ -147,9 +147,9 @@ func Test_newFunctionCall(t *testing.T) { }, }, { - name: "keep none", + name: "keep_keys none", inv: common.Invocation{ - Function: "keep", + Function: "keep_keys", Arguments: []common.Value{ { Path: &common.Path{ @@ -257,7 +257,7 @@ func Test_newFunctionCall_invalid(t *testing.T) { { name: "not matching slice type", inv: common.Invocation{ - Function: "keep", + Function: "keep_keys", Arguments: []common.Value{ { Path: &common.Path{ diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index f79e55e8786d..14c514ff8dfe 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -51,7 +51,7 @@ func TestProcess(t *testing.T) { }, }, { - query: `keep(attributes, "http.method") where name == "operationA"`, + query: `keep_keys(attributes, "http.method") where name == "operationA"`, want: func(td pdata.Traces) { td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().Clear() td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).Attributes().InsertString("http.method", "get") @@ -97,12 +97,12 @@ func BenchmarkTwoSpans(b *testing.B) { queries: []string{`set(attributes["test"], "pass") where name == "operationA"`}, }, { - name: "keep attribute", - queries: []string{`keep(attributes, "http.method") where name == "operationA"`}, + name: "keep_keys attribute", + queries: []string{`keep_keys(attributes, "http.method") where name == "operationA"`}, }, { name: "no match", - queries: []string{`keep(attributes, "http.method") where name == "unknownOperation"`}, + queries: []string{`keep_keys(attributes, "http.method") where name == "unknownOperation"`}, }, { name: "inner field", @@ -153,7 +153,7 @@ func BenchmarkHundredSpans(b *testing.B) { queries := make([]string, 0) queries = append(queries, `set(status.code, 1) where name == "operationA"`) for i := 0; i < 99; i++ { - queries = append(queries, `keep(attributes, "http.method") where name == "unknownOperation"`) + queries = append(queries, `keep_keys(attributes, "http.method") where name == "unknownOperation"`) } return queries }(), diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index 9bc84cb1b491..a4ee3d5a2eb2 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -3,7 +3,7 @@ processors: traces: queries: - set(name, "bear") where attributes["http.path"] == "/animal" - - keep(attributes, "http.method", "http.path") + - keep_keys(attributes, "http.method", "http.path") receivers: nop: From 3673a56549e164e72cde2526e750a887426aeda6 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:10:33 +0000 Subject: [PATCH 11/16] lint --- processor/transformprocessor/internal/traces/functions.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index a6b2a786843e..6de193c25e34 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -24,7 +24,7 @@ import ( ) var registry = map[string]interface{}{ - "keep_keys": keep_keys, + "keep_keys": keepKeys, "set": set, } @@ -42,7 +42,7 @@ func set(target setter, value getter) exprFunc { } } -func keep_keys(target getSetter, keys []string) exprFunc { +func keepKeys(target getSetter, keys []string) exprFunc { keySet := make(map[string]struct{}, len(keys)) for _, key := range keys { keySet[key] = struct{}{} From c55590a8d26c165e565dfecf1e908b057e62fb24 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:11:17 +0000 Subject: [PATCH 12/16] TODO(reflect on reflect) --- processor/transformprocessor/internal/traces/functions.go | 1 + 1 file changed, 1 insertion(+) diff --git a/processor/transformprocessor/internal/traces/functions.go b/processor/transformprocessor/internal/traces/functions.go index 6de193c25e34..0db13c277014 100644 --- a/processor/transformprocessor/internal/traces/functions.go +++ b/processor/transformprocessor/internal/traces/functions.go @@ -70,6 +70,7 @@ func keepKeys(target getSetter, keys []string) exprFunc { } } +// TODO(anuraaga): See if reflection can be avoided without complicating definition of transform functions. func newFunctionCall(inv common.Invocation, functions map[string]interface{}) (exprFunc, error) { if f, ok := functions[inv.Function]; ok { fType := reflect.TypeOf(f) From d870f2ac83df2f3d983b2755c0595f37f055eb6d Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:16:07 +0000 Subject: [PATCH 13/16] invalid config test --- processor/transformprocessor/config_test.go | 16 ++++++++++++++++ .../testdata/invalid_config_bad_syntax.yaml | 19 +++++++++++++++++++ .../invalid_config_unknown_function.yaml | 19 +++++++++++++++++++ 3 files changed, 54 insertions(+) create mode 100644 processor/transformprocessor/testdata/invalid_config_bad_syntax.yaml create mode 100644 processor/transformprocessor/testdata/invalid_config_unknown_function.yaml diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 0e8afee1c0b8..568bc5b3f5af 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -50,3 +50,19 @@ func TestLoadingConfig(t *testing.T) { }, }) } + +func TestLoadInvalidConfig(t *testing.T) { + factories, err := componenttest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Processors[typeStr] = factory + + cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_bad_syntax.yaml"), factories) + assert.Error(t, err) + assert.NotNil(t, cfg) + + cfg, err = servicetest.LoadConfigAndValidate(filepath.Join("testdata", "invalid_config_unknown_function.yaml"), factories) + assert.Error(t, err) + assert.NotNil(t, cfg) +} diff --git a/processor/transformprocessor/testdata/invalid_config_bad_syntax.yaml b/processor/transformprocessor/testdata/invalid_config_bad_syntax.yaml new file mode 100644 index 000000000000..18d61839caa8 --- /dev/null +++ b/processor/transformprocessor/testdata/invalid_config_bad_syntax.yaml @@ -0,0 +1,19 @@ +processors: + transform: + traces: + queries: + - set(name, "bear" where attributes["http.path"] == "/animal" + - keep_keys(attributes, "http.method", "http.path") + +receivers: + nop: + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [transform] + exporters: [nop] diff --git a/processor/transformprocessor/testdata/invalid_config_unknown_function.yaml b/processor/transformprocessor/testdata/invalid_config_unknown_function.yaml new file mode 100644 index 000000000000..75e62bc40663 --- /dev/null +++ b/processor/transformprocessor/testdata/invalid_config_unknown_function.yaml @@ -0,0 +1,19 @@ +processors: + transform: + traces: + queries: + - set(name, "bear") where attributes["http.path"] == "/animal" + - not_a_function(attributes, "http.method", "http.path") + +receivers: + nop: + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [transform] + exporters: [nop] From ab6e39761f53fa19bd8b7df559794b2ff9913336 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:21:36 +0000 Subject: [PATCH 14/16] resource.attributes tests --- .../internal/traces/traces_test.go | 202 ++++++++++++++++++ 1 file changed, 202 insertions(+) diff --git a/processor/transformprocessor/internal/traces/traces_test.go b/processor/transformprocessor/internal/traces/traces_test.go index e225e0f1f555..4db922fda269 100644 --- a/processor/transformprocessor/internal/traces/traces_test.go +++ b/processor/transformprocessor/internal/traces/traces_test.go @@ -458,6 +458,208 @@ func Test_newPathGetSetter(t *testing.T) { span.Status().SetMessage("bad span") }, }, + { + name: "resource attributes", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + }, + }, + orig: refSpan.Attributes(), + new: newAttrs, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().Clear() + newAttrs.CopyTo(resource.Attributes()) + }, + }, + { + name: "resource attributes string", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("str"), + }, + }, + orig: "val", + new: "newVal", + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().UpsertString("str", "newVal") + }, + }, + { + name: "resource attributes bool", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("bool"), + }, + }, + orig: true, + new: false, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().UpsertBool("bool", false) + }, + }, + { + name: "resource attributes int", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("int"), + }, + }, + orig: int64(10), + new: int64(20), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().UpsertInt("int", 20) + }, + }, + { + name: "resource attributes float", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("double"), + }, + }, + orig: float64(1.2), + new: float64(2.4), + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().UpsertDouble("double", 2.4) + }, + }, + { + name: "resource attributes bytes", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("bytes"), + }, + }, + orig: []byte{1, 3, 2}, + new: []byte{2, 3, 4}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().UpsertBytes("bytes", []byte{2, 3, 4}) + }, + }, + { + name: "resource attributes array string", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("arr_str"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_str") + return val.SliceVal() + }(), + new: []string{"new"}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().Upsert("arr_str", newArrStr) + }, + }, + { + name: "resource attributes array bool", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("arr_bool"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_bool") + return val.SliceVal() + }(), + new: []bool{false}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().Upsert("arr_bool", newArrBool) + }, + }, + { + name: "resource attributes array int", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("arr_int"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_int") + return val.SliceVal() + }(), + new: []int64{20}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().Upsert("arr_int", newArrInt) + }, + }, + { + name: "resource attributes array float", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("arr_float"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_float") + return val.SliceVal() + }(), + new: []float64{2.0}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().Upsert("arr_float", newArrFloat) + }, + }, + { + name: "resource attributes array bytes", + path: []common.Field{ + { + Name: "resource", + }, + { + Name: "attributes", + MapKey: strp("arr_bytes"), + }, + }, + orig: func() pdata.AttributeValueSlice { + val, _ := refSpan.Attributes().Get("arr_bytes") + return val.SliceVal() + }(), + new: [][]byte{{9, 6, 4}}, + modified: func(span pdata.Span, il pdata.InstrumentationLibrary, resource pdata.Resource) { + resource.Attributes().Upsert("arr_bytes", newArrBytes) + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 43357f7baf21f5dca087446849709784fbbb90c5 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 2 Feb 2022 07:24:31 +0000 Subject: [PATCH 15/16] factory tests --- processor/transformprocessor/factory_test.go | 39 ++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index ffb43d96cab7..cd255a5383d8 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -15,11 +15,15 @@ package transformprocessor import ( + "context" "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/model/pdata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/traces" ) @@ -49,3 +53,38 @@ func TestFactoryCreateTracesProcessor_Empty(t *testing.T) { err := cfg.Validate() assert.NoError(t, err) } + +func TestFactoryCreateTracesProcessor_InvalidActions(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Traces.Queries = []string{`set(123`} + ap, err := factory.CreateTracesProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop()) + assert.Error(t, err) + assert.Nil(t, ap) +} + +func TestFactoryCreateTracesProcessor(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + oCfg := cfg.(*Config) + oCfg.Traces.Queries = []string{`set(attributes["test"], "pass") where name == "operationA"`} + + tp, err := factory.CreateTracesProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop()) + assert.NotNil(t, tp) + assert.NoError(t, err) + + td := pdata.NewTraces() + span := td.ResourceSpans().AppendEmpty().InstrumentationLibrarySpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("operationA") + + _, ok := span.Attributes().Get("test") + assert.False(t, ok) + + err = tp.ConsumeTraces(context.Background(), td) + assert.NoError(t, err) + + val, ok := span.Attributes().Get("test") + assert.True(t, ok) + assert.Equal(t, "pass", val.StringVal()) +} From 70a87b4c1b80c687c8d0762b23b3224a1f96142f Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 10 Feb 2022 05:40:39 +0000 Subject: [PATCH 16/16] Fix test --- .../transformprocessor/internal/traces/condition_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/transformprocessor/internal/traces/condition_test.go b/processor/transformprocessor/internal/traces/condition_test.go index bbbbdfefb3d7..df1860f0ffaa 100644 --- a/processor/transformprocessor/internal/traces/condition_test.go +++ b/processor/transformprocessor/internal/traces/condition_test.go @@ -89,9 +89,9 @@ func Test_newConditionEvaluator(t *testing.T) { }, }, Right: common.Value{ - String: strp("bear"), + String: strp("cat"), }, - Op: "==", + Op: "!=", }, matching: span, },