From e2f2b634b4df452280875c959ce30b2c6466a2c5 Mon Sep 17 00:00:00 2001 From: John Date: Tue, 16 Nov 2021 01:09:40 -0800 Subject: [PATCH] processor/filterprocessor: Ability to filter spans Signed-off-by: John --- processor/filterprocessor/README.md | 18 +++ processor/filterprocessor/config.go | 22 ++++ processor/filterprocessor/factory.go | 18 +++ .../filter_processor_traces.go | 104 ++++++++++++++++++ 4 files changed, 162 insertions(+) create mode 100644 processor/filterprocessor/filter_processor_traces.go diff --git a/processor/filterprocessor/README.md b/processor/filterprocessor/README.md index e0763c4b8f3d..7ec6add59374 100644 --- a/processor/filterprocessor/README.md +++ b/processor/filterprocessor/README.md @@ -204,3 +204,21 @@ processors: ``` In case the no metric names are provided, `matric_names` being empty, the filtering is only done at resource level. + +### Filter Spans from Traces + +See the documentation in the [attribute processor](../attributesprocessor/README.md) for syntax + +```yaml +processors: + filter: + spans: + exclude: + match_type: regexp + span_names: + - hello_world + - hello/world + attributes: + - Key: container.name + Value: (app_container_1|app_container_1) +``` diff --git a/processor/filterprocessor/config.go b/processor/filterprocessor/config.go index 799ae9f8c374..969e53386f21 100644 --- a/processor/filterprocessor/config.go +++ b/processor/filterprocessor/config.go @@ -15,6 +15,7 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor" import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset/regexp" "go.opentelemetry.io/collector/config" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig" @@ -29,6 +30,8 @@ type Config struct { Metrics MetricFilters `mapstructure:"metrics"` Logs LogFilters `mapstructure:"logs"` + + Spans SpanFilters `mapstructure:"spans"` } // MetricFilters filters by Metric properties. @@ -42,6 +45,25 @@ type MetricFilters struct { // all other metrics should be included. // If both Include and Exclude are specified, Include filtering occurs first. Exclude *filtermetric.MatchProperties `mapstructure:"exclude"` + + // RegexpConfig specifies options for the Regexp match type + RegexpConfig *regexp.Config `mapstructure:"regexp"` +} + +// MetricFilters filters by Metric properties. +type SpanFilters struct { + // Include match properties describe metrics that should be included in the Collector Service pipeline, + // all other metrics should be dropped from further processing. + // If both Include and Exclude are specified, Include filtering occurs first. + Include *filterconfig.MatchProperties `mapstructure:"include"` + + // Exclude match properties describe metrics that should be excluded from the Collector Service pipeline, + // all other metrics should be included. + // If both Include and Exclude are specified, Include filtering occurs first. + Exclude *filterconfig.MatchProperties `mapstructure:"exclude"` + + // RegexpConfig specifies options for the Regexp match type + RegexpConfig *regexp.Config `mapstructure:"regexp"` } // LogFilters filters by Log properties. diff --git a/processor/filterprocessor/factory.go b/processor/filterprocessor/factory.go index 7a5e98a29199..fbaaf89ab7f2 100644 --- a/processor/filterprocessor/factory.go +++ b/processor/filterprocessor/factory.go @@ -37,6 +37,7 @@ func NewFactory() component.ProcessorFactory { createDefaultConfig, processorhelper.WithMetrics(createMetricsProcessor), processorhelper.WithLogs(createLogsProcessor), + processorhelper.WithTraces(createTracesProcessor), ) } @@ -79,3 +80,20 @@ func createLogsProcessor( fp.ProcessLogs, processorhelper.WithCapabilities(processorCapabilities)) } + +func createTracesProcessor( + _ context.Context, + set component.ProcessorCreateSettings, + cfg config.Processor, + nextConsumer consumer.Traces, +) (component.TracesProcessor, error) { + fp, err := newFilterTracesProcessor(set.Logger, cfg.(*Config)) + if err != nil { + return nil, err + } + return processorhelper.NewTracesProcessor( + cfg, + nextConsumer, + fp.processTraces, + processorhelper.WithCapabilities(processorCapabilities)) +} diff --git a/processor/filterprocessor/filter_processor_traces.go b/processor/filterprocessor/filter_processor_traces.go new file mode 100644 index 000000000000..154e3658b7ec --- /dev/null +++ b/processor/filterprocessor/filter_processor_traces.go @@ -0,0 +1,104 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filterprocessor + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterspan" + "go.opentelemetry.io/collector/model/pdata" + "go.opentelemetry.io/collector/processor/processorhelper" + "go.uber.org/zap" +) + +type filterSpanProcessor struct { + cfg *Config + include filterspan.Matcher + exclude filterspan.Matcher + logger *zap.Logger +} + +func newFilterTracesProcessor(logger *zap.Logger, cfg *Config) (*filterSpanProcessor, error) { + inc, err := createSpanMatcher(cfg.Spans.Include) + if err != nil { + return nil, err + } + + exc, err := createSpanMatcher(cfg.Spans.Exclude) + if err != nil { + return nil, err + } + + includeMatchType := "" + var includeResourceAttributes []filterconfig.Attribute + if cfg.Spans.Include != nil { + includeMatchType = string(cfg.Spans.Include.MatchType) + } + + excludeMatchType := "" + var excludeResourceAttributes []filterconfig.Attribute + if cfg.Metrics.Exclude != nil { + excludeMatchType = string(cfg.Spans.Exclude.MatchType) + } + + logger.Info( + "Span filter configured", + zap.String("include match_type", includeMatchType), + zap.Any("include spans with resource attributes", includeResourceAttributes), + zap.String("exclude match_type", excludeMatchType), + zap.Any("exclude spans with resource attributes", excludeResourceAttributes), + ) + + return &filterSpanProcessor{ + cfg: cfg, + include: inc, + exclude: exc, + logger: logger, + }, nil +} + +func createSpanMatcher(sp *filterconfig.MatchProperties) (filterspan.Matcher, error) { + var matcher filterspan.Matcher + var err error + if sp == nil { + panic("No Match Properties for Filter") + } + matcher, err = filterspan.NewMatcher(sp) + return matcher, err +} + +// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters. +func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt pdata.Traces) (pdata.Traces, error) { + + for i := 0; i < pdt.ResourceSpans().Len(); i++ { + resSpan := pdt.ResourceSpans().At(i) + for x := 0; x < resSpan.InstrumentationLibrarySpans().Len(); x++ { + ils := resSpan.InstrumentationLibrarySpans().At(x) + for spanCount := 0; spanCount < ils.Spans().Len(); spanCount++ { + ils.Spans().RemoveIf(func(span pdata.Span) bool { + return filterspan.SkipSpan(fsp.include, fsp.exclude, span, resSpan.Resource(), ils.InstrumentationLibrary()) + }) + } + } + resSpan.InstrumentationLibrarySpans().RemoveIf(func(spans pdata.InstrumentationLibrarySpans) bool { + return spans.Spans().Len() == 0 + }) + } + if pdt.ResourceSpans().Len() == 0 { + return pdt, processorhelper.ErrSkipProcessingData + } + return pdt, nil +}