From 76b9477076953faf57cadcad0f349e1616b20751 Mon Sep 17 00:00:00 2001 From: Pierre Tessier Date: Wed, 29 Sep 2021 07:39:42 -0400 Subject: [PATCH] Add dynamic sampler support to rules based samplers (#317) The RulesBased sample is exclusive to the other samplers. This PR allows you to use Dynamic and EMADynamic samplers to determine the sample rate of a RulesBased sampler rule. Co-authored-by: Mike Goldsmth --- config/config_test.go | 12 +-- config/file_config.go | 29 ------- config/sampler_config.go | 40 ++++++++++ rules_complete.toml | 39 ++++++--- sample/rules.go | 75 ++++++++++++------ sample/rules_test.go | 166 +++++++++++++++++++++++++++++++++++++++ sample/sample.go | 27 +++---- 7 files changed, 302 insertions(+), 86 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index aa05c19cd0..1cd31c1ab7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -206,20 +206,20 @@ func TestReadRulesConfig(t *testing.T) { assert.NoError(t, err) switch r := d.(type) { case *RulesBasedSamplerConfig: - assert.Len(t, r.Rule, 3) + assert.Len(t, r.Rule, 4) var rule *RulesBasedSamplerRule rule = r.Rule[0] - assert.Equal(t, 1, rule.SampleRate) - assert.Equal(t, "500 errors", rule.Name) - assert.Len(t, rule.Condition, 2) - - rule = r.Rule[1] assert.True(t, rule.Drop) assert.Equal(t, 0, rule.SampleRate) assert.Len(t, rule.Condition, 1) + rule = r.Rule[1] + assert.Equal(t, 1, rule.SampleRate) + assert.Equal(t, "500 errors or slow", rule.Name) + assert.Len(t, rule.Condition, 2) + default: assert.Fail(t, "dataset4 should have a rules based sampler", d) } diff --git a/config/file_config.go b/config/file_config.go index 536ac5a997..587e356b51 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -24,35 +24,6 @@ type fileConfig struct { mux sync.RWMutex } -type RulesBasedSamplerCondition struct { - Field string - Operator string - Value interface{} -} - -func (r *RulesBasedSamplerCondition) String() string { - return fmt.Sprintf("%+v", *r) -} - -type RulesBasedSamplerRule struct { - Name string - SampleRate int - Drop bool - Condition []*RulesBasedSamplerCondition -} - -func (r *RulesBasedSamplerRule) String() string { - return fmt.Sprintf("%+v", *r) -} - -type RulesBasedSamplerConfig struct { - Rule []*RulesBasedSamplerRule -} - -func (r *RulesBasedSamplerConfig) String() string { - return fmt.Sprintf("%+v", *r) -} - type configContents struct { ListenAddr string `validate:"required"` PeerListenAddr string `validate:"required"` diff --git a/config/sampler_config.go b/config/sampler_config.go index 84c3bc299a..4812ccaba7 100644 --- a/config/sampler_config.go +++ b/config/sampler_config.go @@ -1,5 +1,9 @@ package config +import ( + "fmt" +) + type DeterministicSamplerConfig struct { SampleRate int `validate:"required,gte=1"` } @@ -36,3 +40,39 @@ type TotalThroughputSamplerConfig struct { AddSampleRateKeyToTrace bool AddSampleRateKeyToTraceField string `validate:"required_with=AddSampleRateKeyToTrace"` } + +type RulesBasedSamplerCondition struct { + Field string + Operator string + Value interface{} +} + +func (r *RulesBasedSamplerCondition) String() string { + return fmt.Sprintf("%+v", *r) +} + +type RulesBasedDownstreamSampler struct { + DynamicSampler *DynamicSamplerConfig + EMADynamicSampler *EMADynamicSamplerConfig + TotalThroughputSampler *TotalThroughputSamplerConfig +} + +type RulesBasedSamplerRule struct { + Name string + SampleRate int + Sampler *RulesBasedDownstreamSampler + Drop bool + Condition []*RulesBasedSamplerCondition +} + +func (r *RulesBasedSamplerRule) String() string { + return fmt.Sprintf("%+v", *r) +} + +type RulesBasedSamplerConfig struct { + Rule []*RulesBasedSamplerRule +} + +func (r *RulesBasedSamplerConfig) String() string { + return fmt.Sprintf("%+v", *r) +} diff --git a/rules_complete.toml b/rules_complete.toml index aabb5f7e06..acb738e850 100644 --- a/rules_complete.toml +++ b/rules_complete.toml @@ -208,27 +208,40 @@ SampleRate = 1 Sampler = "RulesBasedSampler" [[dataset4.rule]] - name = "500 errors" + name = "drop healtchecks" + drop = true + [[dataset4.rule.condition]] + field = "http.route" + operator = "=" + value = "/health-check" + + [[dataset4.rule]] + name = "500 errors or slow" SampleRate = 1 [[dataset4.rule.condition]] - field = "status_code" - operator = "=" - value = 500 + field = "status_code" + operator = "=" + value = 500 [[dataset4.rule.condition]] - field = "duration_ms" - operator = ">=" - value = 1000.789 + field = "duration_ms" + operator = ">=" + value = 1000.789 [[dataset4.rule]] - name = "drop 200 responses" - drop = true + name = "dynamic sample 200 responses" [[dataset4.rule.condition]] - field = "status_code" - operator = "=" - value = 200 + field = "status_code" + operator = "=" + value = 200 + [dataset4.rule.sampler.EMADynamicSampler] + Sampler = "EMADynamicSampler" + GoalSampleRate = 15 + FieldList = ["request.method", "request.route"] + AddSampleRateKeyToTrace = true + AddSampleRateKeyToTraceField = "meta.refinery.dynsampler_key" [[dataset4.rule]] - SampleRate = 10 # default when no rules match, if missing defaults to 1 + SampleRate = 10 # default when no rules match, if missing defaults to 10 [dataset5] diff --git a/sample/rules.go b/sample/rules.go index 4f035c3c4a..3a56d24040 100644 --- a/sample/rules.go +++ b/sample/rules.go @@ -11,9 +11,10 @@ import ( ) type RulesBasedSampler struct { - Config *config.RulesBasedSamplerConfig - Logger logger.Logger - Metrics metrics.Metrics + Config *config.RulesBasedSamplerConfig + Logger logger.Logger + Metrics metrics.Metrics + samplers map[string]Sampler } func (s *RulesBasedSampler) Start() error { @@ -24,6 +25,35 @@ func (s *RulesBasedSampler) Start() error { s.Metrics.Register("rulessampler_num_kept", "counter") s.Metrics.Register("rulessampler_sample_rate", "histogram") + s.samplers = make(map[string]Sampler) + + // Check if any rule has a downstream sampler and create it + for _, rule := range s.Config.Rule { + if rule.Sampler != nil { + var sampler Sampler + if rule.Sampler.DynamicSampler != nil { + sampler = &DynamicSampler{Config: rule.Sampler.DynamicSampler, Logger: s.Logger, Metrics: s.Metrics} + } else if rule.Sampler.EMADynamicSampler != nil { + sampler = &EMADynamicSampler{Config: rule.Sampler.EMADynamicSampler, Logger: s.Logger, Metrics: s.Metrics} + } else if rule.Sampler.TotalThroughputSampler != nil { + sampler = &TotalThroughputSampler{Config: rule.Sampler.TotalThroughputSampler, Logger: s.Logger, Metrics: s.Metrics} + } else { + s.Logger.Debug().WithFields(map[string]interface{}{ + "rule_name": rule.Name, + }).Logf("invalid or missing downstream sampler") + continue + } + + err := sampler.Start() + if err != nil { + s.Logger.Debug().WithFields(map[string]interface{}{ + "rule_name": rule.Name, + }).Logf("error creating downstream sampler: %s", err) + continue + } + s.samplers[rule.String()] = sampler + } + } return nil } @@ -34,24 +64,6 @@ func (s *RulesBasedSampler) GetSampleRate(trace *types.Trace) (rate uint, keep b for _, rule := range s.Config.Rule { var matched int - rate := uint(rule.SampleRate) - keep := !rule.Drop && rule.SampleRate > 0 && rand.Intn(rule.SampleRate) == 0 - - // no condition signifies the default - if rule.Condition == nil { - s.Metrics.Histogram("rulessampler_sample_rate", float64(rule.SampleRate)) - if keep { - s.Metrics.Increment("rulessampler_num_kept") - } else { - s.Metrics.Increment("rulessampler_num_dropped") - } - logger.WithFields(map[string]interface{}{ - "rate": rate, - "keep": keep, - "drop_rule": rule.Drop, - }).Logf("got sample rate and decision") - return rate, keep - } for _, condition := range rule.Condition { span: @@ -127,7 +139,25 @@ func (s *RulesBasedSampler) GetSampleRate(trace *types.Trace) (rate uint, keep b } } - if matched == len(rule.Condition) { + if rule.Condition == nil || matched == len(rule.Condition) { + var rate uint + var keep bool + + if rule.Sampler != nil { + var sampler Sampler + var found bool + if sampler, found = s.samplers[rule.String()]; !found { + logger.WithFields(map[string]interface{}{ + "rule_name": rule.Name, + }).Logf("could not find downstream sampler for rule: %s", rule.Name) + return 1, true + } + rate, keep = sampler.GetSampleRate(trace) + } else { + rate = uint(rule.SampleRate) + keep = !rule.Drop && rule.SampleRate > 0 && rand.Intn(rule.SampleRate) == 0 + } + s.Metrics.Histogram("rulessampler_sample_rate", float64(rule.SampleRate)) if keep { s.Metrics.Increment("rulessampler_num_kept") @@ -138,7 +168,6 @@ func (s *RulesBasedSampler) GetSampleRate(trace *types.Trace) (rate uint, keep b "rate": rate, "keep": keep, "drop_rule": rule.Drop, - "rule_name": rule.Name, }).Logf("got sample rate and decision") return rate, keep } diff --git a/sample/rules_test.go b/sample/rules_test.go index 2965c70285..6bcb5b747d 100644 --- a/sample/rules_test.go +++ b/sample/rules_test.go @@ -520,3 +520,169 @@ func TestRules(t *testing.T) { } } } + +func TestRulesWithDynamicSampler(t *testing.T) { + data := []TestRulesData{ + { + Rules: &config.RulesBasedSamplerConfig{ + Rule: []*config.RulesBasedSamplerRule{ + { + Name: "downstream-dynamic", + Condition: []*config.RulesBasedSamplerCondition{ + { + Field: "rule_test", + Operator: "=", + Value: int64(1), + }, + }, + Sampler: &config.RulesBasedDownstreamSampler{ + DynamicSampler: &config.DynamicSamplerConfig{ + SampleRate: 10, + FieldList: []string{"http.status_code"}, + AddSampleRateKeyToTrace: true, + AddSampleRateKeyToTraceField: "meta.key", + }, + }, + }, + }, + }, + Spans: []*types.Span{ + { + Event: types.Event{ + Data: map[string]interface{}{ + "rule_test": int64(1), + "http.status_code": "200", + }, + }, + }, + { + Event: types.Event{ + Data: map[string]interface{}{ + "rule_test": int64(1), + "http.status_code": "200", + }, + }, + }, + }, + ExpectedKeep: true, + ExpectedRate: 10, + }, + } + + for _, d := range data { + sampler := &RulesBasedSampler{ + Config: d.Rules, + Logger: &logger.NullLogger{}, + Metrics: &metrics.NullMetrics{}, + } + + trace := &types.Trace{} + + for _, span := range d.Spans { + trace.AddSpan(span) + } + + sampler.Start() + rate, keep := sampler.GetSampleRate(trace) + + assert.Equal(t, d.ExpectedRate, rate, d.Rules) + + // we can only test when we don't expect to keep the trace + if !d.ExpectedKeep { + assert.Equal(t, d.ExpectedKeep, keep, d.Rules) + } + + spans := trace.GetSpans() + assert.Len(t, spans, len(d.Spans), "should have the same number of spans as input") + for _, span := range spans { + assert.Equal(t, span.Event.Data, map[string]interface{}{ + "rule_test": int64(1), + "http.status_code": "200", + "meta.key": "200•,", + }, "should add the sampling key to all spans in the trace") + } + } +} + +func TestRulesWithEMADynamicSampler(t *testing.T) { + data := []TestRulesData{ + { + Rules: &config.RulesBasedSamplerConfig{ + Rule: []*config.RulesBasedSamplerRule{ + { + Name: "downstream-dynamic", + Condition: []*config.RulesBasedSamplerCondition{ + { + Field: "rule_test", + Operator: "=", + Value: int64(1), + }, + }, + Sampler: &config.RulesBasedDownstreamSampler{ + EMADynamicSampler: &config.EMADynamicSamplerConfig{ + GoalSampleRate: 10, + FieldList: []string{"http.status_code"}, + AddSampleRateKeyToTrace: true, + AddSampleRateKeyToTraceField: "meta.key", + }, + }, + }, + }, + }, + Spans: []*types.Span{ + { + Event: types.Event{ + Data: map[string]interface{}{ + "rule_test": int64(1), + "http.status_code": "200", + }, + }, + }, + { + Event: types.Event{ + Data: map[string]interface{}{ + "rule_test": int64(1), + "http.status_code": "200", + }, + }, + }, + }, + ExpectedKeep: true, + ExpectedRate: 10, + }, + } + + for _, d := range data { + sampler := &RulesBasedSampler{ + Config: d.Rules, + Logger: &logger.NullLogger{}, + Metrics: &metrics.NullMetrics{}, + } + + trace := &types.Trace{} + + for _, span := range d.Spans { + trace.AddSpan(span) + } + + sampler.Start() + rate, keep := sampler.GetSampleRate(trace) + + assert.Equal(t, d.ExpectedRate, rate, d.Rules) + + // we can only test when we don't expect to keep the trace + if !d.ExpectedKeep { + assert.Equal(t, d.ExpectedKeep, keep, d.Rules) + } + + spans := trace.GetSpans() + assert.Len(t, spans, len(d.Spans), "should have the same number of spans as input") + for _, span := range spans { + assert.Equal(t, span.Event.Data, map[string]interface{}{ + "rule_test": int64(1), + "http.status_code": "200", + "meta.key": "200•,", + }, "should add the sampling key to all spans in the trace") + } + } +} diff --git a/sample/sample.go b/sample/sample.go index 9fb4539cfb..eef4337ca5 100644 --- a/sample/sample.go +++ b/sample/sample.go @@ -11,6 +11,7 @@ import ( type Sampler interface { GetSampleRate(trace *types.Trace) (rate uint, keep bool) + Start() error } // SamplerFactory is used to create new samplers with common (injected) resources @@ -32,30 +33,26 @@ func (s *SamplerFactory) GetSamplerImplementationForDataset(dataset string) Samp switch c := c.(type) { case *config.DeterministicSamplerConfig: - ds := &DeterministicSampler{Config: c, Logger: s.Logger} - ds.Start() - sampler = ds + sampler = &DeterministicSampler{Config: c, Logger: s.Logger} case *config.DynamicSamplerConfig: - ds := &DynamicSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} - ds.Start() - sampler = ds + sampler = &DynamicSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} case *config.EMADynamicSamplerConfig: - ds := &EMADynamicSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} - ds.Start() - sampler = ds + sampler = &EMADynamicSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} case *config.RulesBasedSamplerConfig: - ds := &RulesBasedSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} - ds.Start() - sampler = ds + sampler = &RulesBasedSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} case *config.TotalThroughputSamplerConfig: - ds := &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} - ds.Start() - sampler = ds + sampler = &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} default: s.Logger.Error().Logf("unknown sampler type %T. Exiting.", c) os.Exit(1) } + err = sampler.Start() + if err != nil { + s.Logger.Debug().WithField("dataset", dataset).Logf("failed to start sampler") + return nil + } + s.Logger.Debug().WithField("dataset", dataset).Logf("created implementation for sampler type %T", c) return sampler