Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send kept trace decision in a separate goroutine #1412

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 64 additions & 78 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ type InMemCollector struct {
dropDecisionMessages chan string
keptDecisionMessages chan string

dropDecisionBatch chan string
hostname string
dropDecisionBatch chan string
keptDecisionBuffer chan string
hostname string
}

var inMemCollectorMetrics = []metrics.Metadata{
Expand Down Expand Up @@ -156,6 +157,10 @@ var inMemCollectorMetrics = []metrics.Metadata{
{Name: "collector_drop_decision_batch_count", Type: metrics.Histogram, Unit: metrics.Dimensionless, Description: "number of drop decisions sent in a batch"},
{Name: "collector_expired_traces_missing_decisions", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of decision spans forwarded for expired traces missing trace decision"},
{Name: "collector_expired_traces_orphans", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of expired traces missing trace decision when they are sent"},
{Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of kept decision message received"},
{Name: "drop_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of drop decision message received"},
{Name: "collector_kept_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times kept trace decision queue is full"},
{Name: "collector_drop_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times drop trace decision queue is full"},
}

func (i *InMemCollector) Start() error {
Expand Down Expand Up @@ -208,14 +213,16 @@ func (i *InMemCollector) Start() error {
i.PubSub.Subscribe(context.Background(), keptTraceDecisionTopic, i.signalKeptTraceDecisions)
i.PubSub.Subscribe(context.Background(), droppedTraceDecisionTopic, i.signalDroppedTraceDecisions)

i.dropDecisionBatch = make(chan string, 1000)
i.dropDecisionBatch = make(chan string, i.Config.GetCollectionConfig().MaxDropDecisionBatchSize*5)
i.keptDecisionBuffer = make(chan string, 100_000)
}

// spin up one collector because this is a single threaded collector
go i.collect()
go i.sendTraces()
// spin up a drop decision batch sender
go i.sendDropDecisions()
go i.sendKeptDecisions()

return nil
}
Expand Down Expand Up @@ -399,13 +406,6 @@ func (i *InMemCollector) collect() {
return
case <-i.redistributeTimer.Notify():
i.redistributeTraces(ctx)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
return
}

i.processKeptDecision(msg)
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
Expand Down Expand Up @@ -841,54 +841,18 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
// if we receive a proxy span after a trace decision has been made,
// we should just broadcast the decision again
if sp.IsDecisionSpan() {
var (
msg string
err error
)
topic := keptTraceDecisionTopic
if tr.Kept() {
// late span in this case won't get HasRoot
// this means the late span won't be decorated with some metadata
// like span count, event count, link count
msg, err = newKeptDecisionMessage(TraceDecision{
TraceID: sp.TraceID,
Kept: tr.Kept(),
KeptReason: keptReason,
SendReason: TraceSendLateSpan,
SampleRate: tr.Rate(),
Count: uint32(tr.SpanCount()),
EventCount: uint32(tr.SpanEventCount()),
LinkCount: uint32(tr.SpanLinkCount()),
})
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to create new kept decision message")
return
}
} else {
topic = droppedTraceDecisionTopic
msg, err = newDroppedDecisionMessage(sp.TraceID)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to create new dropped decision message")
return
}
}

err = i.PubSub.Publish(ctx, topic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to publish trace decision")
// late span in this case won't get HasRoot
td := TraceDecision{
TraceID: sp.TraceID,
Kept: tr.Kept(),
KeptReason: keptReason,
SendReason: TraceSendLateSpan,
SampleRate: tr.Rate(),
Count: uint32(tr.SpanCount()),
EventCount: uint32(tr.SpanEventCount()),
LinkCount: uint32(tr.SpanLinkCount()),
}
i.publishTraceDecision(ctx, td)
return
}

Expand Down Expand Up @@ -1077,6 +1041,7 @@ func (i *InMemCollector) Stop() error {

if !i.Config.GetCollectionConfig().EnableTraceLocality {
close(i.dropDecisionBatch)
close(i.keptDecisionBuffer)
}

return nil
Expand Down Expand Up @@ -1368,6 +1333,8 @@ func (i *InMemCollector) signalDroppedTraceDecisions(ctx context.Context, msg st
}

func (i *InMemCollector) processDropDecisions(msg string) {
i.Metrics.Increment("drop_decisions_received")

ids := newDroppedTraceDecision(msg)

if len(ids) == 0 {
Expand All @@ -1393,6 +1360,8 @@ func (i *InMemCollector) processDropDecisions(msg string) {
}

func (i *InMemCollector) processKeptDecision(msg string) {
i.Metrics.Increment("kept_decisions_received")

td, err := newKeptTraceDecision(msg)
if err != nil {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
Expand Down Expand Up @@ -1513,33 +1482,50 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis

if td.Kept {
decisionMsg, err = newKeptDecisionMessage(td)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to create trace decision message")
return
}

select {
case i.keptDecisionBuffer <- decisionMsg:
default:
i.Metrics.Increment("collector_kept_decisions_queue_full")
i.Logger.Warn().Logf("kept trace decision buffer is full. Dropping message")
}
return
} else {
// if we're dropping the trace, we should add it to the batch so we can send it later
i.dropDecisionBatch <- td.TraceID
select {
case i.dropDecisionBatch <- td.TraceID:
default:
i.Metrics.Increment("collector_drop_decisions_queue_full")
i.Logger.Warn().Logf("drop trace decision buffer is full. Dropping message")
}
return
}
}

if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to create trace decision message")
func (i *InMemCollector) sendKeptDecisions() {
if i.Config.GetCollectionConfig().EnableTraceLocality {
return
}

err = i.PubSub.Publish(ctx, keptTraceDecisionTopic, decisionMsg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": td.TraceID,
"kept": td.Kept,
"reason": td.KeptReason,
"sampler": td.SamplerKey,
"selector": td.SamplerSelector,
"error": err.Error(),
}).Logf("Failed to publish trace decision")
ctx := context.Background()
for msg := range i.keptDecisionBuffer {
err := i.PubSub.Publish(ctx, keptTraceDecisionTopic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"error": err.Error(),
}).Logf("Failed to publish trace decision")
}

}
}

Expand Down
19 changes: 19 additions & 0 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func TestAddRootSpan(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -268,6 +269,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {
coll.fromPeer = make(chan *types.Span, 5)
coll.dropDecisionBatch = make(chan string, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendDropDecisions()
Expand Down Expand Up @@ -369,6 +371,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -426,6 +429,7 @@ func TestAddSpan(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -532,6 +536,7 @@ func TestDryRunMode(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -758,6 +763,7 @@ func TestStableMaxAlloc(t *testing.T) {
coll.incoming = make(chan *types.Span, 1000)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 500)
coll.keptDecisionBuffer = make(chan string, 500)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -950,6 +956,7 @@ func TestAddCountsToRoot(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1040,6 +1047,7 @@ func TestLateRootGetsCounts(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1134,7 +1142,9 @@ func TestAddSpanCount(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

go coll.collect()
go coll.sendTraces()

Expand Down Expand Up @@ -1225,6 +1235,7 @@ func TestLateRootGetsSpanCount(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1303,6 +1314,7 @@ func TestLateSpanNotDecorated(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1375,6 +1387,7 @@ func TestAddAdditionalAttributes(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1536,6 +1549,7 @@ func TestStressReliefDecorateHostname(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1643,6 +1657,7 @@ func TestSpanWithRuleReasons(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -1753,6 +1768,7 @@ func TestRedistributeTraces(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
Expand Down Expand Up @@ -1863,6 +1879,7 @@ func TestDrainTracesOnShutdown(t *testing.T) {
coll.fromPeer = make(chan *types.Span, 5)

coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

sentTraceChan := make(chan sentRecord, 1)
Expand Down Expand Up @@ -1993,6 +2010,7 @@ func TestBigTracesGoEarly(t *testing.T) {
coll.incoming = make(chan *types.Span, 500)
coll.fromPeer = make(chan *types.Span, 500)
coll.outgoingTraces = make(chan sendableTrace, 500)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)
go coll.collect()
go coll.sendTraces()
Expand Down Expand Up @@ -2226,6 +2244,7 @@ func TestExpiredTracesCleanup(t *testing.T) {
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.keptDecisionBuffer = make(chan string, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

for _, traceID := range peerTraceIDs {
Expand Down