Skip to content

Commit

Permalink
Store sampling.probability in sampled span attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
pmm-sumo committed Nov 25, 2020
1 parent 47ffc51 commit ae407ee
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ The following configuration options can be modified:
- `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed.
- `sampling_percentage` (default = 0): Percentage at which traces are sampled; >= 100 samples all traces

The sampled spans have [`sampling.probability`](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/sdk.md#sampling)
attribute added, which includes the value in range of `(0, 1.0]` representing the probability with which the record
was sampled. If the span was already sampled before and the attribute is present, the existing value is multiplied.

Examples:

```yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
)

// samplingPriority has the semantic result of parsing the "sampling.priority"
Expand Down Expand Up @@ -49,9 +50,10 @@ const (
)

type tracesamplerprocessor struct {
nextConsumer consumer.TracesConsumer
scaledSamplingRate uint32
hashSeed uint32
nextConsumer consumer.TracesConsumer
scaledSamplingRate uint32
samplingProbability float64
hashSeed uint32
}

// newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given
Expand All @@ -64,8 +66,9 @@ func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (compon
return &tracesamplerprocessor{
nextConsumer: nextConsumer,
// Adjust sampling percentage on private so recalculations are avoided.
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
hashSeed: cfg.HashSeed,
scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor),
samplingProbability: float64(cfg.SamplingPercentage) * 0.01,
hashSeed: cfg.HashSeed,
}, nil
}

Expand All @@ -82,6 +85,16 @@ func (tsp *tracesamplerprocessor) ConsumeTraces(ctx context.Context, td pdata.Tr
return tsp.nextConsumer.ConsumeTraces(ctx, sampledTraceData)
}

func (tsp *tracesamplerprocessor) updateSamplingProbability(sampledSpanAttributes pdata.AttributeMap) {
samplingProbability := tsp.samplingProbability
attr, found := sampledSpanAttributes.Get(conventions.AttributeSamplingProbability)
if found && attr.Type() == pdata.AttributeValueDOUBLE {
samplingProbability *= attr.DoubleVal()
}

sampledSpanAttributes.UpsertDouble(conventions.AttributeSamplingProbability, samplingProbability)
}

func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpans, sampledTraceData pdata.Traces) {
scaledSamplingRate := tsp.scaledSamplingRate

Expand Down Expand Up @@ -115,6 +128,7 @@ func (tsp *tracesamplerprocessor) processTraces(resourceSpans pdata.ResourceSpan
hash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < scaledSamplingRate

if sampled {
tsp.updateSamplingProbability(span.Attributes())
spns.Append(span)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
)

Expand Down Expand Up @@ -71,6 +72,7 @@ func TestNewTraceProcessor(t *testing.T) {
if !tt.wantErr {
// The truncation below with uint32 cannot be defined at initialization (compiler error), performing it at runtime.
tt.want.(*tracesamplerprocessor).scaledSamplingRate = uint32(tt.cfg.SamplingPercentage * percentageScaleFactor)
tt.want.(*tracesamplerprocessor).samplingProbability = float64(tt.cfg.SamplingPercentage) * 0.01
}
got, err := newTraceProcessor(tt.nextConsumer, tt.cfg)
if (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -227,15 +229,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t

// Test_tracesamplerprocessor_SpanSamplingPriority checks if handling of "sampling.priority" is correct.
func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
singleSpanWithAttrib := func(key string, attribValue pdata.AttributeValue) pdata.Traces {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
rs := traces.ResourceSpans().At(0)
rs.InstrumentationLibrarySpans().Resize(1)
instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0)
instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue))
return traces
}

tests := []struct {
name string
cfg Config
Expand All @@ -247,7 +241,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueInt(2)),
sampled: true,
Expand All @@ -257,7 +251,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueDouble(1)),
sampled: true,
Expand All @@ -267,7 +261,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueString("1")),
sampled: true,
Expand All @@ -277,7 +271,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueInt(0)),
},
Expand All @@ -286,7 +280,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueDouble(0)),
},
Expand All @@ -295,7 +289,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"sampling.priority",
pdata.NewAttributeValueString("0")),
},
Expand All @@ -304,7 +298,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 0.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"no.sampling.priority",
pdata.NewAttributeValueInt(2)),
},
Expand All @@ -313,7 +307,7 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) {
cfg: Config{
SamplingPercentage: 100.0,
},
td: singleSpanWithAttrib(
td: getTracesWithSpanWithAttribute(
"no.sampling.priority",
pdata.NewAttributeValueInt(2)),
sampled: true,
Expand Down Expand Up @@ -416,6 +410,59 @@ func Test_parseSpanSamplingPriority(t *testing.T) {
}
}

// Test_tracesamplerprocessor_SamplingProbabilityAttribute verifies if the attribute describing current sampling rate is included in sampled spans
func Test_tracesamplerprocessor_SamplingProbabilityAttribute(t *testing.T) {
cfg := Config{
SamplingPercentage: 100.0,
}

tests := []struct {
name string
traces pdata.Traces
wantSamplingProbabilityAttribute pdata.AttributeValue
}{
{
name: "simple_span",
traces: getTracesWithSpanWithAttribute("foo", pdata.NewAttributeValueString("bar")),
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0),
},
{
name: "span_came_through_sampler_already",
traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueDouble(0.01)),
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(0.01),
},
{
name: "simple_with_invalid_attribute_value",
traces: getTracesWithSpanWithAttribute(conventions.AttributeSamplingProbability, pdata.NewAttributeValueString("bar")),
wantSamplingProbabilityAttribute: pdata.NewAttributeValueDouble(1.0),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sink := new(consumertest.TracesSink)
tsp, err := newTraceProcessor(sink, cfg)
if err != nil {
t.Errorf("error when creating tracesamplerprocessor: %v", err)
return
}

if err := tsp.ConsumeTraces(context.Background(), tt.traces); err != nil {
t.Errorf("tracesamplerprocessor.ConsumeTraceData() error = %v", err)
return
}
assert.Equal(t, 1, sink.SpansCount())
for _, td := range sink.AllTraces() {
span := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0)
attrValue, found := span.Attributes().Get(conventions.AttributeSamplingProbability)
assert.True(t, found, "Sampling probability attribute not found")
assert.Equal(t, tt.wantSamplingProbabilityAttribute, attrValue)
}
sink.Reset()
})
}
}

func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span {
span := pdata.NewSpan()
span.InitEmpty()
Expand All @@ -424,6 +471,16 @@ func getSpanWithAttributes(key string, value pdata.AttributeValue) pdata.Span {
return span
}

func getTracesWithSpanWithAttribute(key string, attribValue pdata.AttributeValue) pdata.Traces {
traces := pdata.NewTraces()
traces.ResourceSpans().Resize(1)
rs := traces.ResourceSpans().At(0)
rs.InstrumentationLibrarySpans().Resize(1)
instrLibrarySpans := rs.InstrumentationLibrarySpans().At(0)
instrLibrarySpans.Spans().Append(getSpanWithAttributes(key, attribValue))
return traces
}

// Test_hash ensures that the hash function supports different key lengths even if in
// practice it is only expected to receive keys with length 16 (trace id length in OC proto).
func Test_hash(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions translator/conventions/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
AttributeProcessExecutablePath = "process.executable.path"
AttributeProcessID = "process.pid"
AttributeProcessOwner = "process.owner"
AttributeSamplingProbability = "sampling.probability"
AttributeServiceInstance = "service.instance.id"
AttributeServiceName = "service.name"
AttributeServiceNamespace = "service.namespace"
Expand Down

0 comments on commit ae407ee

Please sign in to comment.