Skip to content

Commit

Permalink
Add support for composite sampling policy to the tailsampler (#4958)
Browse files Browse the repository at this point in the history
Is your feature request related to a problem? Please describe.
#1410
Design Doc - Added support for composite policy in tailsampling processor. This would help in grouping sampling policies and rate limiting them. https://docs.google.com/document/d/10wpIv3TtXgOik05smHm3nYeBX48Bj76TCMxPy8e1NZw/edit#heading=h.ecy5l2puwtp4
This is a split. Refer PR open-telemetry/opentelemetry-collector#1894 (comment)

Due to EasyCLA issue opening a new PR: #4396

Link to tracking Issue: 1306
  • Loading branch information
vikrambe authored Oct 21, 2021
1 parent 9e44b1f commit b90d1f0
Show file tree
Hide file tree
Showing 11 changed files with 819 additions and 5 deletions.
52 changes: 47 additions & 5 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ Multiple policies exist today and it is straight forward to add more. These incl
- `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`)
- `string_attribute`: Sample based on string attributes value matches, both exact and regex value matches are supported
- `rate_limiting`: Sample based on rate
- `composite`: Sample based on a combination of above samplers, with ordering and rate allocation per sampler. Rate allocation allocates certain percentages of spans per policy order.
For example if we have set max_total_spans_per_second as 100 then we can set rate_allocation as follows
1. test-composite-policy-1 = 50 % of max_total_spans_per_second = 50 spans_per_second
2. test-composite-policy-2 = 25 % of max_total_spans_per_second = 25 spans_per_second
3. To ensure remaining capacity is filled use always_sample as one of the policies

The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
Expand Down Expand Up @@ -73,13 +78,50 @@ processors:
name: test-policy-8,
type: rate_limiting,
rate_limiting: {spans_per_second: 35}
},
{
name: test-policy-8,
},
{
name: test-policy-9,
type: string_attribute,
string_attribute: {key: http.url, values: [\/health, \/metrics], enabled_regex_matching: true, invert_match: true}
}
]
},
{
name: composite-policy-1,
type: composite,
composite:
{
max_total_spans_per_second: 1000,
policy_order: [test-composite-policy-1, test-composite-policy-2, test-composite-policy-3],
composite_sub_policy:
[
{
name: test-composite-policy-1,
type: numeric_attribute,
numeric_attribute: {key: key1, min_value: 50, max_value: 100}
},
{
name: test-composite-policy-2,
type: string_attribute,
string_attribute: {key: key2, values: [value1, value2]}
},
{
name: test-composite-policy-3,
type: always_sample
}
],
rate_allocation:
[
{
policy: test-composite-policy-1,
percent: 50
},
{
policy: test-composite-policy-2,
percent: 25
}
]
}
},
]
```

Refer to [tail_sampling_config.yaml](./testdata/tail_sampling_config.yaml) for detailed
Expand Down
74 changes: 74 additions & 0 deletions processor/tailsamplingprocessor/composite_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tailsamplingprocessor

import (
"fmt"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func getNewCompositePolicy(logger *zap.Logger, config CompositeCfg) (sampling.PolicyEvaluator, error) {
var subPolicyEvalParams []sampling.SubPolicyEvalParams
rateAllocationsMap := getRateAllocationMap(config)
for i := range config.SubPolicyCfg {
policyCfg := config.SubPolicyCfg[i]
policy, _ := getSubPolicyEvaluator(logger, &policyCfg)

evalParams := sampling.SubPolicyEvalParams{
Evaluator: policy,
MaxSpansPerSecond: int64(rateAllocationsMap[policyCfg.Name]),
}
subPolicyEvalParams = append(subPolicyEvalParams, evalParams)
}
return sampling.NewComposite(logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}), nil
}

// Apply rate allocations to the sub-policies
func getRateAllocationMap(config CompositeCfg) map[string]float64 {
rateAllocationsMap := make(map[string]float64)
maxTotalSPS := float64(config.MaxTotalSpansPerSecond)
// Default SPS determined by equally diving number of sub policies
defaultSPS := maxTotalSPS / float64(len(config.SubPolicyCfg))
for _, rAlloc := range config.RateAllocation {
if rAlloc.Percent > 0 {
rateAllocationsMap[rAlloc.Policy] = (float64(rAlloc.Percent) / 100) * maxTotalSPS
} else {
rateAllocationsMap[rAlloc.Policy] = defaultSPS
}
}
return rateAllocationsMap
}

// Return instance of composite sub-policy
func getSubPolicyEvaluator(logger *zap.Logger, cfg *SubPolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(logger), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values, safCfg.EnabledRegexMatching, safCfg.CacheMaxSize, safCfg.InvertMatch), nil
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
}
86 changes: 86 additions & 0 deletions processor/tailsamplingprocessor/composite_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tailsamplingprocessor

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"
"go.uber.org/zap"
)

func TestCompositeHelper(t *testing.T) {
cfg := &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
PolicyCfgs: []PolicyCfg{
{
Name: "composite-policy-1",
Type: Composite,
CompositeCfg: CompositeCfg{
MaxTotalSpansPerSecond: 1000,
PolicyOrder: []string{"test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3", "test-composite-policy-4", "test-composite-policy-5"},
SubPolicyCfg: []SubPolicyCfg{
{
Name: "test-composite-policy-1",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-composite-policy-2",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-composite-policy-3",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 10},
},
{
Name: "test-composite-policy-4",
Type: AlwaysSample,
},
{
Name: "test-composite-policy-5",
},
},
RateAllocation: []RateAllocationCfg{
{
Policy: "test-composite-policy-1",
Percent: 50,
},
{
Policy: "test-composite-policy-2",
Percent: 25,
},
},
},
},
},
}
rlfCfg := cfg.PolicyCfgs[0].CompositeCfg
composite, e := getNewCompositePolicy(zap.NewNop(), rlfCfg)
require.NotNil(t, composite)
require.NotNil(t, cfg.ProcessorSettings)
require.Equal(t, 10*time.Second, cfg.DecisionWait)
require.Equal(t, uint64(100), cfg.NumTraces)
require.Equal(t, uint64(10), cfg.ExpectedNewTracesPerSec)
require.NoError(t, e)
// TBD add more assertions
}
37 changes: 37 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,43 @@ const (
StringAttribute PolicyType = "string_attribute"
// RateLimiting allows all traces until the specified limits are satisfied.
RateLimiting PolicyType = "rate_limiting"
// Composite allows defining a composite policy, combining the other policies in one
Composite PolicyType = "composite"
)

// SubPolicyCfg holds the common configuration to all policies under composite policy.
type SubPolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType `mapstructure:"type"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
// Configs for latency filter sampling policy evaluator.
LatencyCfg LatencyCfg `mapstructure:"latency"`
// Configs for status code filter sampling policy evaluator.
StatusCodeCfg StatusCodeCfg `mapstructure:"status_code"`
}

// CompositeCfg holds the configurable settings to create a composite
// sampling policy evaluator.
type CompositeCfg struct {
MaxTotalSpansPerSecond int64 `mapstructure:"max_total_spans_per_second"`
PolicyOrder []string `mapstructure:"policy_order"`
SubPolicyCfg []SubPolicyCfg `mapstructure:"composite_sub_policy"`
RateAllocation []RateAllocationCfg `mapstructure:"rate_allocation"`
}

// RateAllocationCfg used within composite policy
type RateAllocationCfg struct {
Policy string `mapstructure:"policy"`
Percent int64 `mapstructure:"percent"`
}

// PolicyCfg holds the common configuration to all policies.
type PolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Expand All @@ -60,6 +95,8 @@ type PolicyCfg struct {
StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"`
// Configs for defining composite policy
CompositeCfg CompositeCfg `mapstructure:"composite"`
}

// LatencyCfg holds the configurable settings to create a latency filter sampling policy
Expand Down
34 changes: 34 additions & 0 deletions processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,40 @@ func TestLoadConfig(t *testing.T) {
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
{
Name: "composite-policy-1",
Type: Composite,
CompositeCfg: CompositeCfg{
MaxTotalSpansPerSecond: 1000,
PolicyOrder: []string{"test-composite-policy-1", "test-composite-policy-2", "test-composite-policy-3"},
SubPolicyCfg: []SubPolicyCfg{
{
Name: "test-composite-policy-1",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-composite-policy-2",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-composite-policy-3",
Type: AlwaysSample,
},
},
RateAllocation: []RateAllocationCfg{
{
Policy: "test-composite-policy-1",
Percent: 50,
},
{
Policy: "test-composite-policy-2",
Percent: 25,
},
},
},
},
},
})
}
Loading

0 comments on commit b90d1f0

Please sign in to comment.