Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new tail sampling processor policy: latency #3750

Merged
merged 4 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The following configuration options are required:

Multiple policies exist today and it is straight forward to add more. These include:
- `always_sample`: Sample all traces
- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between.
- `numeric_attribute`: Sample based on number attributes
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
- `rate_limiting`: Sample based on rate
Expand All @@ -38,21 +39,26 @@ processors:
},
{
name: test-policy-2,
type: latency,
latency: {threshold_ms: 5000}
},
{
name: test-policy-3,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-3,
name: test-policy-4,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-3,
name: test-policy-5,
type: string_attribute,
string_attribute: {key: key2, values: [value1, val*], enabled_regex_matching: true, cache_max_size: 10}
},
{
name: test-policy-4,
name: test-policy-6,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
}
Expand Down
11 changes: 11 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type PolicyType string
const (
// AlwaysSample samples all traces, typically used for debugging.
AlwaysSample PolicyType = "always_sample"
// Latency sample traces that are longer than a given threshold.
Latency PolicyType = "latency"
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttribute PolicyType = "numeric_attribute"
Expand All @@ -42,6 +44,8 @@ type PolicyCfg struct {
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType `mapstructure:"type"`
// Configs for latency filter sampling policy evaluator.
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for string attribute filter sampling policy evaluator.
Expand All @@ -50,6 +54,13 @@ type PolicyCfg struct {
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
}

// LatencyCfg holds the configurable settings to create a latency filter sampling policy
// evaluator
type LatencyCfg struct {
// ThresholdMs in milliseconds.
ThresholdMs int64 `mapstructure:"threshold_ms"`
}

// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter
// sampling policy evaluator.
type NumericAttributeCfg struct {
Expand Down
11 changes: 8 additions & 3 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,22 @@ func TestLoadConfig(t *testing.T) {
Type: AlwaysSample,
},
{
Name: "test-policy-2",
Name: "test-policy-2",
Type: Latency,
LatencyCfg: LatencyCfg{ThresholdMs: 5000},
},
{
Name: "test-policy-3",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-policy-3",
Name: "test-policy-4",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-4",
Name: "test-policy-5",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(logger), nil
case Latency:
lfCfg := cfg.LatencyCfg
return sampling.NewLatency(logger, lfCfg.ThresholdMs), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
Expand Down
93 changes: 93 additions & 0 deletions processor/tailsamplingprocessor/sampling/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

type latency struct {
logger *zap.Logger
thresholdMs int64
}

var _ PolicyEvaluator = (*latency)(nil)

// NewLatency creates a policy evaluator sampling traces with a duration higher than a configured threshold
func NewLatency(logger *zap.Logger, thresholdMs int64) PolicyEvaluator {
return &latency{
logger: logger,
thresholdMs: thresholdMs,
}
}

// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived
// after the sampling decision was already taken for the trace.
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (l *latency) OnLateArrivingSpans(Decision, []*pdata.Span) error {
l.logger.Debug("Triggering action for late arriving spans in latency filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (l *latency) Evaluate(_ pdata.TraceID, traceData *TraceData) (Decision, error) {
l.logger.Debug("Evaluating spans in latency filter")

traceData.Lock()
batches := traceData.ReceivedBatches
traceData.Unlock()

var minTime pdata.Timestamp
var maxTime pdata.Timestamp

return inspectSpans(batches, func(span pdata.Span) bool {
if minTime == 0 || span.StartTimestamp() < minTime {
minTime = span.StartTimestamp()
}
if maxTime == 0 || span.EndTimestamp() > maxTime {
maxTime = span.EndTimestamp()
}

duration := maxTime.AsTime().Sub(minTime.AsTime())
return duration.Milliseconds() >= l.thresholdMs
}), nil
}

// inspectSpans iterates through all the spans until any callback returns true.
func inspectSpans(batches []pdata.Traces, shouldSample func(span pdata.Span) bool) Decision {
for _, batch := range batches {
rspans := batch.ResourceSpans()

for i := 0; i < rspans.Len(); i++ {
rs := rspans.At(i)
ilss := rs.InstrumentationLibrarySpans()

for i := 0; i < ilss.Len(); i++ {
ils := ilss.At(i)

for j := 0; j < ils.Spans().Len(); j++ {
span := ils.Spans().At(j)

if shouldSample(span) {
return Sampled
}
}
}
}
}
return NotSampled
}
112 changes: 112 additions & 0 deletions processor/tailsamplingprocessor/sampling/latency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestEvaluate_Latency(t *testing.T) {
filter := NewLatency(zap.NewNop(), 5000)

traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
now := time.Now()

cases := []struct {
Desc string
Spans []spanWithTimeAndDuration
Decision Decision
}{
{
"trace duration shorter than threshold",
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 4500 * time.Millisecond,
},
},
NotSampled,
},
{
"trace duration is equal to threshold",
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 5000 * time.Millisecond,
},
},
Sampled,
},
{
"total trace duration is longer than threshold but every single span is shorter",
[]spanWithTimeAndDuration{
{
StartTime: now,
Duration: 3000 * time.Millisecond,
},
{
StartTime: now.Add(2500 * time.Millisecond),
Duration: 3000 * time.Millisecond,
},
},
Sampled,
},
}

for _, c := range cases {
t.Run(c.Desc, func(t *testing.T) {
decision, err := filter.Evaluate(traceID, newTraceWithSpans(c.Spans))

assert.NoError(t, err)
assert.Equal(t, decision, c.Decision)
})
}
}

func TestOnLateArrivingSpans_Latency(t *testing.T) {
filter := NewLatency(zap.NewNop(), 5000)
err := filter.OnLateArrivingSpans(NotSampled, nil)
assert.Nil(t, err)
}

type spanWithTimeAndDuration struct {
StartTime time.Time
Duration time.Duration
}

func newTraceWithSpans(spans []spanWithTimeAndDuration) *TraceData {
var traceBatches []pdata.Traces
traces := pdata.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.InstrumentationLibrarySpans().AppendEmpty()

for _, s := range spans {
span := ils.Spans().AppendEmpty()
span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.SetStartTimestamp(pdata.TimestampFromTime(s.StartTime))
span.SetEndTimestamp(pdata.TimestampFromTime(s.StartTime.Add(s.Duration)))
}

traceBatches = append(traceBatches, traces)
return &TraceData{
ReceivedBatches: traceBatches,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ processors:
},
{
name: test-policy-2,
type: latency,
latency: {threshold_ms: 5000}
},
{
name: test-policy-3,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-3,
name: test-policy-4,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-4,
name: test-policy-5,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
},
Expand Down