Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new tail sampling processor policy: status_code #3754

Merged
merged 10 commits into from
Jun 24, 2021
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

## 💡 Enhancements 💡

- `tailsampling` processor: Add new policy `latency` (#3750)
- `tailsampling` processor:
- Add new policy `latency` (#3750)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the latency made it to 0.29.0, no?

Copy link
Member

@jpkrohling jpkrohling Jun 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... 0.29.0 hasn't been released for contrib yet. I heard yesterday that the release train has departed already, though (#3863).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it still possible to get this PR in 0.29 or is the release already cut?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to avoid a merge conflict with the release PR, should I wait until #3863 is merged to rebase and update the changelog?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @bogdandrutu mentioned yesterday that they were having build problems, blocking the release. I don't expect any new features to be included for 0.29.0. Wait for the release, then rebase and update the changelog.

- Add new policy `status_code` (#3754)
- `splunkhec` exporter: Include `trace_id` and `span_id` if set (#3850)

## v0.28.0
Expand Down
10 changes: 8 additions & 2 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Multiple policies exist today and it is straight forward to add more. These incl
- `always_sample`: Sample all traces
- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between.
- `numeric_attribute`: Sample based on number attributes
- `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`)
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
- `rate_limiting`: Sample based on rate

Expand Down Expand Up @@ -49,16 +50,21 @@ processors:
},
{
name: test-policy-4,
type: status_code,
status_code: {status_codes: [ERROR, UNSET]}
},
{
name: test-policy-5,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-5,
name: test-policy-6,
type: string_attribute,
string_attribute: {key: key2, values: [value1, val*], enabled_regex_matching: true, cache_max_size: 10}
},
{
name: test-policy-6,
name: test-policy-7,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
}
Expand Down
10 changes: 10 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttribute PolicyType = "numeric_attribute"
// StatusCode sample traces that have a given status code.
StatusCode PolicyType = "status_code"
// StringAttribute sample traces that a attribute, of type string, matching
// one of the listed values.
StringAttribute PolicyType = "string_attribute"
Expand All @@ -48,6 +50,8 @@ type PolicyCfg struct {
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for status code filter sampling policy evaluator.
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
Expand All @@ -72,6 +76,12 @@ type NumericAttributeCfg struct {
MaxValue int64 `mapstructure:"max_value"`
}

// StatusCodeCfg holds the configurable settings to create a status code filter sampling
// policy evaluator.
type StatusCodeCfg struct {
StatusCodes []string `mapstructure:"status_codes"`
}

// StringAttributeCfg holds the configurable settings to create a string attribute filter
// sampling policy evaluator.
type StringAttributeCfg struct {
Expand Down
9 changes: 7 additions & 2 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ func TestLoadConfig(t *testing.T) {
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-policy-4",
Name: "test-policy-4",
Type: StatusCode,
StatusCodeCfg: StatusCodeCfg{StatusCodes: []string{"ERROR", "UNSET"}},
},
{
Name: "test-policy-5",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-5",
Name: "test-policy-6",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize), nil
case StatusCode:
scfCfg := cfg.StatusCodeCfg
return sampling.NewStatusCodeFilter(logger, scfCfg.StatusCodes)
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil
Expand Down
27 changes: 1 addition & 26 deletions processor/tailsamplingprocessor/sampling/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (l *latency) Evaluate(_ pdata.TraceID, traceData *TraceData) (Decision, err
var minTime pdata.Timestamp
var maxTime pdata.Timestamp

return inspectSpans(batches, func(span pdata.Span) bool {
return hasSpanWithCondition(batches, func(span pdata.Span) bool {
if minTime == 0 || span.StartTimestamp() < minTime {
minTime = span.StartTimestamp()
}
Expand All @@ -66,28 +66,3 @@ func (l *latency) Evaluate(_ pdata.TraceID, traceData *TraceData) (Decision, err
return duration.Milliseconds() >= l.thresholdMs
}), nil
}

// inspectSpans iterates through all the spans until any callback returns true.
func inspectSpans(batches []pdata.Traces, shouldSample func(span pdata.Span) bool) Decision {
for _, batch := range batches {
rspans := batch.ResourceSpans()

for i := 0; i < rspans.Len(); i++ {
rs := rspans.At(i)
ilss := rs.InstrumentationLibrarySpans()

for i := 0; i < ilss.Len(); i++ {
ils := ilss.At(i)

for j := 0; j < ils.Spans().Len(); j++ {
span := ils.Spans().At(j)

if shouldSample(span) {
return Sampled
}
}
}
}
}
return NotSampled
}
26 changes: 8 additions & 18 deletions processor/tailsamplingprocessor/sampling/numeric_tag_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,14 @@ func (naf *numericAttributeFilter) Evaluate(_ pdata.TraceID, trace *TraceData) (
trace.Lock()
batches := trace.ReceivedBatches
trace.Unlock()
for _, batch := range batches {
rspans := batch.ResourceSpans()
for i := 0; i < rspans.Len(); i++ {
rs := rspans.At(i)
ilss := rs.InstrumentationLibrarySpans()
for j := 0; j < ilss.Len(); j++ {
ils := ilss.At(j)
for k := 0; k < ils.Spans().Len(); k++ {
span := ils.Spans().At(k)
if v, ok := span.Attributes().Get(naf.key); ok {
value := v.IntVal()
if value >= naf.minValue && value <= naf.maxValue {
return Sampled, nil
}
}
}

return hasSpanWithCondition(batches, func(span pdata.Span) bool {
if v, ok := span.Attributes().Get(naf.key); ok {
value := v.IntVal()
if value >= naf.minValue && value <= naf.maxValue {
return true
}
}
}
return NotSampled, nil
return false
}), nil
}
85 changes: 85 additions & 0 deletions processor/tailsamplingprocessor/sampling/status_code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

type statusCodeFilter struct {
logger *zap.Logger
statusCodes []pdata.StatusCode
}

var _ PolicyEvaluator = (*statusCodeFilter)(nil)

// NewStatusCodeFilter creates a policy evaluator that samples all traces with
// a given status code.
func NewStatusCodeFilter(logger *zap.Logger, statusCodeString []string) (PolicyEvaluator, error) {
if len(statusCodeString) == 0 {
return nil, errors.New("expected at least one status code to filter on")
}

statusCodes := make([]pdata.StatusCode, len(statusCodeString))

for i := range statusCodeString {
switch statusCodeString[i] {
case "OK":
statusCodes[i] = pdata.StatusCodeOk
case "ERROR":
statusCodes[i] = pdata.StatusCodeError
case "UNSET":
statusCodes[i] = pdata.StatusCodeUnset
default:
return nil, fmt.Errorf("unknown status code %q, supported: OK, ERROR, UNSET", statusCodeString[i])
}
}

return &statusCodeFilter{
logger: logger,
statusCodes: statusCodes,
}, nil
}

// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived
// after the sampling decision was already taken for the trace.
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (r *statusCodeFilter) OnLateArrivingSpans(Decision, []*pdata.Span) error {
r.logger.Debug("Triggering action for late arriving spans in status code filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (r *statusCodeFilter) Evaluate(_ pdata.TraceID, trace *TraceData) (Decision, error) {
r.logger.Debug("Evaluating spans in status code filter")

trace.Lock()
batches := trace.ReceivedBatches
trace.Unlock()

return hasSpanWithCondition(batches, func(span pdata.Span) bool {
for _, statusCode := range r.statusCodes {
if span.Status().Code() == statusCode {
return true
}
}
return false
}), nil
}
101 changes: 101 additions & 0 deletions processor/tailsamplingprocessor/sampling/status_code_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestNewStatusCodeFilter_errorHandling(t *testing.T) {
_, err := NewStatusCodeFilter(zap.NewNop(), []string{})
assert.Error(t, err, "expected at least one status code to filter on")

_, err = NewStatusCodeFilter(zap.NewNop(), []string{"OK", "ERR"})
assert.EqualError(t, err, "unknown status code \"ERR\", supported: OK, ERROR, UNSET")
}

func TestPercentageSampling(t *testing.T) {
traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})

cases := []struct {
Desc string
StatusCodesToFilterOn []string
StatusCodesPresent []pdata.StatusCode
Decision Decision
}{
{
Desc: "filter on ERROR - none match",
StatusCodesToFilterOn: []string{"ERROR"},
StatusCodesPresent: []pdata.StatusCode{pdata.StatusCodeOk, pdata.StatusCodeUnset, pdata.StatusCodeOk},
Decision: NotSampled,
},
{
Desc: "filter on OK and ERROR - none match",
StatusCodesToFilterOn: []string{"OK", "ERROR"},
StatusCodesPresent: []pdata.StatusCode{pdata.StatusCodeUnset, pdata.StatusCodeUnset},
Decision: NotSampled,
},
{
Desc: "filter on UNSET - matches",
StatusCodesToFilterOn: []string{"UNSET"},
StatusCodesPresent: []pdata.StatusCode{pdata.StatusCodeUnset},
Decision: Sampled,
},
{
Desc: "filter on OK and UNSET - matches",
StatusCodesToFilterOn: []string{"OK", "UNSET"},
StatusCodesPresent: []pdata.StatusCode{pdata.StatusCodeError, pdata.StatusCodeOk},
Decision: Sampled,
},
}

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
traces := pdata.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.InstrumentationLibrarySpans().AppendEmpty()

for _, statusCode := range c.StatusCodesPresent {
span := ils.Spans().AppendEmpty()
span.Status().SetCode(statusCode)
span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
}

trace := &TraceData{
ReceivedBatches: []pdata.Traces{traces},
}

statusCodeFilter, err := NewStatusCodeFilter(zap.NewNop(), c.StatusCodesToFilterOn)
assert.NoError(t, err)

decision, err := statusCodeFilter.Evaluate(traceID, trace)
assert.NoError(t, err)
assert.Equal(t, c.Decision, decision)
})
}
}

func TestOnLateArrivingSpans_PercentageSampling(t *testing.T) {
statusCode, err := NewStatusCodeFilter(zap.NewNop(), []string{"ERROR"})
assert.Nil(t, err)

err = statusCode.OnLateArrivingSpans(NotSampled, nil)
assert.Nil(t, err)
}
Loading