Skip to content

Commit

Permalink
publish trace decision for late spans through a goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Nov 6, 2024
1 parent 0a73586 commit cdb839f
Showing 1 changed file with 18 additions and 53 deletions.
71 changes: 18 additions & 53 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,13 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source
i.dealWithSentTrace(ctx, sr, keptReason, sp)
return
}

// if the span is sent for signaling expired traces,
// we should not add it to the cache
if sp.Data["meta.refinery.expired_trace"] != nil {
return
}

// trace hasn't already been sent (or this span is really old); let's
// create a new trace to hold it
span.SetAttributes(attribute.Bool("create_new_trace", true))
Expand Down Expand Up @@ -723,12 +730,6 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source
i.dealWithSentTrace(ctx, cache.NewKeptTraceCacheEntry(trace), "", sp)
}

// if the span is sent for signaling expired traces,
// we should not add it to the cache
if sp.Data["meta.refinery.expired_trace"] != nil {
return
}

// great! trace is live. add the span.
trace.AddSpan(sp)
span.SetAttributes(attribute.String("disposition", "live_trace"))
Expand Down Expand Up @@ -856,54 +857,18 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
// if we receive a proxy span after a trace decision has been made,
// we should just broadcast the decision again
if sp.IsDecisionSpan() {
var (
msg string
err error
)
topic := keptTraceDecisionTopic
if tr.Kept() {
// late span in this case won't get HasRoot
// this means the late span won't be decorated with some metadata
// like span count, event count, link count
msg, err = newKeptDecisionMessage(TraceDecision{
TraceID: sp.TraceID,
Kept: tr.Kept(),
KeptReason: keptReason,
SendReason: TraceSendLateSpan,
SampleRate: tr.Rate(),
Count: uint32(tr.SpanCount()),
EventCount: uint32(tr.SpanEventCount()),
LinkCount: uint32(tr.SpanLinkCount()),
})
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to create new kept decision message")
return
}
} else {
topic = droppedTraceDecisionTopic
msg, err = newDroppedDecisionMessage(sp.TraceID)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to create new dropped decision message")
return
}
}

err = i.PubSub.Publish(ctx, topic, msg)
if err != nil {
i.Logger.Error().WithFields(map[string]interface{}{
"trace_id": sp.TraceID,
"kept": tr.Kept(),
"late_span": true,
}).Logf("Failed to publish trace decision")
// late span in this case won't get HasRoot
td := TraceDecision{
TraceID: sp.TraceID,
Kept: tr.Kept(),
KeptReason: keptReason,
SendReason: TraceSendLateSpan,
SampleRate: tr.Rate(),
Count: uint32(tr.SpanCount()),
EventCount: uint32(tr.SpanEventCount()),
LinkCount: uint32(tr.SpanLinkCount()),
}
i.publishTraceDecision(ctx, td)
return
}

Expand Down

0 comments on commit cdb839f

Please sign in to comment.