Skip to content

Commit

Permalink
tailsamplingprocessor: add latency policy
Browse files Browse the repository at this point in the history
  • Loading branch information
Koenraad Verheyden committed Jun 10, 2021
1 parent 18cec46 commit 54603c7
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 9 deletions.
12 changes: 9 additions & 3 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The following configuration options are required:

Multiple policies exist today and it is straight forward to add more. These include:
- `always_sample`: Sample all traces
- `latency`: Sample based on duration
- `numeric_attribute`: Sample based on number attributes
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
- `rate_limiting`: Sample based on rate
Expand All @@ -38,21 +39,26 @@ processors:
},
{
name: test-policy-2,
type: latency,
latency: {threshold_ms: 5000}
},
{
name: test-policy-3,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-3,
name: test-policy-4,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-3,
name: test-policy-5,
type: string_attribute,
string_attribute: {key: key2, values: [value1, val*], enabled_regex_matching: true, cache_max_size: 10}
},
{
name: test-policy-4,
name: test-policy-6,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
}
Expand Down
11 changes: 11 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type PolicyType string
const (
// AlwaysSample samples all traces, typically used for debugging.
AlwaysSample PolicyType = "always_sample"
// Latency sample traces that are longer than a given threshold.
Latency PolicyType = "latency"
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttribute PolicyType = "numeric_attribute"
Expand All @@ -42,6 +44,8 @@ type PolicyCfg struct {
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType `mapstructure:"type"`
// Configs for latency filter sampling policy evaluator.
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for string attribute filter sampling policy evaluator.
Expand All @@ -50,6 +54,13 @@ type PolicyCfg struct {
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
}

// LatencyCfg holds the configurable settings to create a latency filter sampling policy
// evaluator
type LatencyCfg struct {
// ThresholdMs in milliseconds.
ThresholdMs int64 `mapstructure:"threshold_ms"`
}

// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter
// sampling policy evaluator.
type NumericAttributeCfg struct {
Expand Down
11 changes: 8 additions & 3 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,22 @@ func TestLoadConfig(t *testing.T) {
Type: AlwaysSample,
},
{
Name: "test-policy-2",
Name: "test-policy-2",
Type: Latency,
LatencyCfg: LatencyCfg{ThresholdMs: 5000},
},
{
Name: "test-policy-3",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-policy-3",
Name: "test-policy-4",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-4",
Name: "test-policy-5",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
Expand Down
3 changes: 3 additions & 0 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(logger), nil
case Latency:
lfCfg := cfg.LatencyCfg
return sampling.NewLatency(logger, lfCfg.ThresholdMs), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
Expand Down
79 changes: 79 additions & 0 deletions processor/tailsamplingprocessor/sampling/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

type latency struct {
logger *zap.Logger
thresholdMs int64
}

var _ PolicyEvaluator = (*latency)(nil)

// NewLatency creates a policy evaluator the samples all traces.
func NewLatency(logger *zap.Logger, thresholdMs int64) PolicyEvaluator {
return &latency{
logger: logger,
thresholdMs: thresholdMs,
}
}

// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived
// after the sampling decision was already taken for the trace.
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (l *latency) OnLateArrivingSpans(Decision, []*pdata.Span) error {
l.logger.Debug("Triggering action for late arriving spans in latency filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (l *latency) Evaluate(traceID pdata.TraceID, traceData *TraceData) (Decision, error) {
l.logger.Debug("Evaluating spans in latency filter")

traceData.Lock()
batches := traceData.ReceivedBatches
traceData.Unlock()

for _, batch := range batches {
rspans := batch.ResourceSpans()

for i := 0; i < rspans.Len(); i++ {
rs := rspans.At(i)
ilss := rs.InstrumentationLibrarySpans()

for i := 0; i < ilss.Len(); i++ {
ils := ilss.At(i)

for j := 0; j < ils.Spans().Len(); j++ {
span := ils.Spans().At(j)

startTime := span.StartTimestamp().AsTime()
endTime := span.EndTimestamp().AsTime()

duration := endTime.Sub(startTime)
if duration.Milliseconds() >= l.thresholdMs {
return Sampled, nil
}
}
}
}
}
return NotSampled, nil
}
62 changes: 62 additions & 0 deletions processor/tailsamplingprocessor/sampling/latency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sampling

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

func TestEvaluate_Latency(t *testing.T) {
filter := NewLatency(zap.NewNop(), 5000)

traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})

decision, err := filter.Evaluate(traceID, newTraceWithDuration(4500*time.Millisecond))
assert.Nil(t, err)
assert.Equal(t, decision, NotSampled)

decision, err = filter.Evaluate(traceID, newTraceWithDuration(5500*time.Millisecond))
assert.Nil(t, err)
assert.Equal(t, decision, Sampled)
}

func TestOnLateArrivingSpans_Latency(t *testing.T) {
filter := NewLatency(zap.NewNop(), 5000)
err := filter.OnLateArrivingSpans(NotSampled, nil)
assert.Nil(t, err)
}

func newTraceWithDuration(duration time.Duration) *TraceData {
now := time.Now()

var traceBatches []pdata.Traces
traces := pdata.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.InstrumentationLibrarySpans().AppendEmpty()
span := ils.Spans().AppendEmpty()
span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
span.SetStartTimestamp(pdata.TimestampFromTime(now))
span.SetEndTimestamp(pdata.TimestampFromTime(now.Add(duration)))
traceBatches = append(traceBatches, traces)
return &TraceData{
ReceivedBatches: traceBatches,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ processors:
name: test-policy-1,
type: always_sample
},
{
name: test-policy-2,
type: latency,
latency: {threshold_ms: 5000}
},
{
name: test-policy-2,
name: test-policy-3,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-policy-3,
name: test-policy-4,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-policy-4,
name: test-policy-5,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
},
Expand Down

0 comments on commit 54603c7

Please sign in to comment.