Skip to content

Commit

Permalink
processor/filterprocessor: Ability to filter spans
Browse files Browse the repository at this point in the history
Signed-off-by: John <john.dorman@sony.com>
  • Loading branch information
boostchicken authored and jpkrohling committed Jan 18, 2022
1 parent add40a5 commit e2f2b63
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 0 deletions.
18 changes: 18 additions & 0 deletions processor/filterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,21 @@ processors:
```

In case the no metric names are provided, `matric_names` being empty, the filtering is only done at resource level.

### Filter Spans from Traces

See the documentation in the [attribute processor](../attributesprocessor/README.md) for syntax

```yaml
processors:
filter:
spans:
exclude:
match_type: regexp
span_names:
- hello_world
- hello/world
attributes:
- Key: container.name
Value: (app_container_1|app_container_1)
```
22 changes: 22 additions & 0 deletions processor/filterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package filterprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset/regexp"
"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
Expand All @@ -29,6 +30,8 @@ type Config struct {
Metrics MetricFilters `mapstructure:"metrics"`

Logs LogFilters `mapstructure:"logs"`

Spans SpanFilters `mapstructure:"spans"`
}

// MetricFilters filters by Metric properties.
Expand All @@ -42,6 +45,25 @@ type MetricFilters struct {
// all other metrics should be included.
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *filtermetric.MatchProperties `mapstructure:"exclude"`

// RegexpConfig specifies options for the Regexp match type
RegexpConfig *regexp.Config `mapstructure:"regexp"`
}

// MetricFilters filters by Metric properties.
type SpanFilters struct {
// Include match properties describe metrics that should be included in the Collector Service pipeline,
// all other metrics should be dropped from further processing.
// If both Include and Exclude are specified, Include filtering occurs first.
Include *filterconfig.MatchProperties `mapstructure:"include"`

// Exclude match properties describe metrics that should be excluded from the Collector Service pipeline,
// all other metrics should be included.
// If both Include and Exclude are specified, Include filtering occurs first.
Exclude *filterconfig.MatchProperties `mapstructure:"exclude"`

// RegexpConfig specifies options for the Regexp match type
RegexpConfig *regexp.Config `mapstructure:"regexp"`
}

// LogFilters filters by Log properties.
Expand Down
18 changes: 18 additions & 0 deletions processor/filterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewFactory() component.ProcessorFactory {
createDefaultConfig,
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor),
processorhelper.WithTraces(createTracesProcessor),
)
}

Expand Down Expand Up @@ -79,3 +80,20 @@ func createLogsProcessor(
fp.ProcessLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(
_ context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
fp, err := newFilterTracesProcessor(set.Logger, cfg.(*Config))
if err != nil {
return nil, err
}
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
fp.processTraces,
processorhelper.WithCapabilities(processorCapabilities))
}
104 changes: 104 additions & 0 deletions processor/filterprocessor/filter_processor_traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filterprocessor

import (
"context"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterspan"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"
)

type filterSpanProcessor struct {
cfg *Config
include filterspan.Matcher
exclude filterspan.Matcher
logger *zap.Logger
}

func newFilterTracesProcessor(logger *zap.Logger, cfg *Config) (*filterSpanProcessor, error) {
inc, err := createSpanMatcher(cfg.Spans.Include)
if err != nil {
return nil, err
}

exc, err := createSpanMatcher(cfg.Spans.Exclude)
if err != nil {
return nil, err
}

includeMatchType := ""
var includeResourceAttributes []filterconfig.Attribute
if cfg.Spans.Include != nil {
includeMatchType = string(cfg.Spans.Include.MatchType)
}

excludeMatchType := ""
var excludeResourceAttributes []filterconfig.Attribute
if cfg.Metrics.Exclude != nil {
excludeMatchType = string(cfg.Spans.Exclude.MatchType)
}

logger.Info(
"Span filter configured",
zap.String("include match_type", includeMatchType),
zap.Any("include spans with resource attributes", includeResourceAttributes),
zap.String("exclude match_type", excludeMatchType),
zap.Any("exclude spans with resource attributes", excludeResourceAttributes),
)

return &filterSpanProcessor{
cfg: cfg,
include: inc,
exclude: exc,
logger: logger,
}, nil
}

func createSpanMatcher(sp *filterconfig.MatchProperties) (filterspan.Matcher, error) {
var matcher filterspan.Matcher
var err error
if sp == nil {
panic("No Match Properties for Filter")
}
matcher, err = filterspan.NewMatcher(sp)
return matcher, err
}

// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters.
func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt pdata.Traces) (pdata.Traces, error) {

for i := 0; i < pdt.ResourceSpans().Len(); i++ {
resSpan := pdt.ResourceSpans().At(i)
for x := 0; x < resSpan.InstrumentationLibrarySpans().Len(); x++ {
ils := resSpan.InstrumentationLibrarySpans().At(x)
for spanCount := 0; spanCount < ils.Spans().Len(); spanCount++ {
ils.Spans().RemoveIf(func(span pdata.Span) bool {
return filterspan.SkipSpan(fsp.include, fsp.exclude, span, resSpan.Resource(), ils.InstrumentationLibrary())
})
}
}
resSpan.InstrumentationLibrarySpans().RemoveIf(func(spans pdata.InstrumentationLibrarySpans) bool {
return spans.Spans().Len() == 0
})
}
if pdt.ResourceSpans().Len() == 0 {
return pdt, processorhelper.ErrSkipProcessingData
}
return pdt, nil
}

0 comments on commit e2f2b63

Please sign in to comment.