Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deal with orphan traces and expired traces #1408

Merged
merged 19 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 77 additions & 53 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,17 @@ func (i *InMemCollector) checkAlloc(ctx context.Context) {
totalDataSizeSent := 0
tracesSent := generics.NewSet[string]()
// Send the traces we can't keep.
traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout()
for _, trace := range allTraces {
if _, ok := i.IsMyTrace(trace.ID()); !ok {
// only eject traces that belong to this peer or the trace is an orphan
if _, ok := i.IsMyTrace(trace.ID()); !ok && !trace.IsOrphan(traceTimeout, i.Clock.Now()) {
i.Logger.Debug().WithFields(map[string]interface{}{
"trace_id": trace.ID(),
}).Logf("cannot eject trace that does not belong to this peer")

continue
}
td, err := i.makeDecision(trace, TraceSendEjectedMemsize)
td, err := i.makeDecision(ctx, trace, TraceSendEjectedMemsize)
if err != nil {
continue
}
Expand Down Expand Up @@ -412,58 +414,48 @@ func (i *InMemCollector) collect() {
span.End()
return
}
i.processSpan(ctx, sp)
i.processSpan(ctx, sp, "peer")
default:
select {
case <-i.done:
span.End()
return
case msg, ok := <-i.dropDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}

i.processDropDecisions(msg)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}

i.processKeptDecision(msg)
case <-ticker.C:
select {
case <-i.done:
span.End()
return
default:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)

// Briefly unlock the cache, to allow test access.
_, span3 := otelutil.StartSpan(ctx, i.Tracer, "Gosched")
i.mutex.Unlock()
runtime.Gosched()
i.mutex.Lock()
span3.End()
}
case <-i.redistributeTimer.Notify():
i.redistributeTraces(ctx)
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)

// maybe only do this if in test mode?
// Briefly unlock the cache, to allow test access.
_, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched")
i.mutex.Unlock()
runtime.Gosched()
i.mutex.Lock()
goSchedSpan.End()
case sp, ok := <-i.incoming:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processSpan(ctx, sp)
i.processSpan(ctx, sp, "incoming")
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processSpan(ctx, sp)
i.processSpan(ctx, sp, "peer")
case <-i.reload:
i.reloadConfigs()
}
Expand All @@ -475,7 +467,7 @@ func (i *InMemCollector) collect() {
}

func (i *InMemCollector) redistributeTraces(ctx context.Context) {
_, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces")
ctx, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces")
redistrubutionStartTime := i.Clock.Now()

defer func() {
Expand Down Expand Up @@ -503,23 +495,21 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) {
if trace == nil {
continue
}
_, span2 := otelutil.StartSpanWith(ctx, i.Tracer, "distributeTrace", "num_spans", trace.DescendantCount())
_, redistributeTraceSpan := otelutil.StartSpanWith(ctx, i.Tracer, "distributeTrace", "num_spans", trace.DescendantCount())

newTarget := i.Sharder.WhichShard(trace.TraceID)

span2.SetAttributes(attribute.String("shard", newTarget.GetAddress()))
redistributeTraceSpan.SetAttributes(attribute.String("shard", newTarget.GetAddress()))

if newTarget.Equals(i.Sharder.MyShard()) {
span2.SetAttributes(attribute.Bool("self", true))
span2.End()
redistributeTraceSpan.SetAttributes(attribute.Bool("self", true))
redistributeTraceSpan.End()
continue
}

span2.SetAttributes(attribute.String("shard", newTarget.GetAddress()))

// if the ownership of the trace hasn't changed, we don't need to forward new decision spans
if newTarget.GetAddress() == trace.DeciderShardAddr {
span2.End()
redistributeTraceSpan.End()
continue
}

Expand Down Expand Up @@ -552,7 +542,7 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) {
}

forwardedTraces.Add(trace.TraceID)
span2.End()
redistributeTraceSpan.End()
}

otelutil.AddSpanFields(span, map[string]interface{}{
Expand All @@ -569,9 +559,12 @@ func (i *InMemCollector) redistributeTraces(ctx context.Context) {

func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.Time) {
ctx, span := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTracesInCache")
defer span.End()

startTime := time.Now()
defer func() {
i.Metrics.Histogram("collector_send_expired_traces_in_cache_dur_ms", time.Since(startTime).Milliseconds())
span.End()
}()

expiredTraces := make([]*types.Trace, 0)
traceTimeout := i.Config.GetTracesConfig().GetTraceTimeout()
var orphanTraceCount int
Expand All @@ -580,18 +573,19 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.
return true
}

timeoutDuration := now.Sub(t.SendBy)
// if a trace has expired more than 4 times the trace timeout, we should just make a decision for it
// if the trace is an orphan trace, we should just make a decision for it
// instead of waiting for the decider node
if timeoutDuration > traceTimeout*4 {
if t.IsOrphan(traceTimeout, now) {
orphanTraceCount++
return true
}

// if a trace has expired more than 2 times the trace timeout, we should forward it to its decider
// and wait for the decider to publish the trace decision again
if timeoutDuration > traceTimeout*2 {
// only retry it once
if now.Sub(t.SendBy) > traceTimeout*2 && !t.Retried {
expiredTraces = append(expiredTraces, t)
t.Retried = true
}

// by returning false we will not remove the trace from the cache
Expand All @@ -610,50 +604,56 @@ func (i *InMemCollector) sendExpiredTracesInCache(ctx context.Context, now time.
var totalSpansSent int64

for _, t := range traces {
ctx, sendExpiredTraceSpan := otelutil.StartSpan(ctx, i.Tracer, "sendExpiredTrace")
totalSpansSent += int64(t.DescendantCount())

if t.RootSpan != nil {
td, err := i.makeDecision(t, TraceSendGotRoot)
td, err := i.makeDecision(ctx, t, TraceSendGotRoot)
if err != nil {
sendExpiredTraceSpan.End()
continue
}
i.send(ctx, t, td)
} else {
if spanLimit > 0 && t.DescendantCount() > spanLimit {
td, err := i.makeDecision(t, TraceSendSpanLimit)
td, err := i.makeDecision(ctx, t, TraceSendSpanLimit)
if err != nil {
sendExpiredTraceSpan.End()
continue
}
i.send(ctx, t, td)
} else {
td, err := i.makeDecision(t, TraceSendExpired)
td, err := i.makeDecision(ctx, t, TraceSendExpired)
if err != nil {
sendExpiredTraceSpan.End()
continue
}
i.send(ctx, t, td)
}
}

sendExpiredTraceSpan.End()
}

for _, trace := range expiredTraces {
// if a trace has expired and it doesn't belong to this peer, we should ask its decider to
// publish the trace decision again
i.PeerTransmission.EnqueueEvent(i.createDecisionSpan(&types.Span{
dc := i.createDecisionSpan(&types.Span{
TraceID: trace.ID(),
Event: types.Event{
Context: trace.GetSpans()[0].Context,
APIKey: trace.APIKey,
Dataset: trace.Dataset,
},
}, trace, i.Sharder.WhichShard(trace.ID())))
}, trace, i.Sharder.WhichShard(trace.ID()))
dc.Data["meta.refinery.expired_trace"] = true
i.PeerTransmission.EnqueueEvent(dc)
}
span.SetAttributes(attribute.Int64("total_spans_sent", totalSpansSent))
}

// processSpan does all the stuff necessary to take an incoming span and add it
// to (or create a new placeholder for) a trace.
func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) {
func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span, source string) {
ctx, span := otelutil.StartSpan(ctx, i.Tracer, "processSpan")
defer func() {
i.Metrics.Increment("span_processed")
Expand Down Expand Up @@ -688,6 +688,13 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) {
i.dealWithSentTrace(ctx, sr, keptReason, sp)
return
}

// if the span is sent for signaling expired traces,
// we should not add it to the cache
if sp.Data["meta.refinery.expired_trace"] != nil {
return
}

// trace hasn't already been sent (or this span is really old); let's
// create a new trace to hold it
span.SetAttributes(attribute.Bool("create_new_trace", true))
Expand Down Expand Up @@ -727,15 +734,22 @@ func (i *InMemCollector) processSpan(ctx context.Context, sp *types.Span) {
i.dealWithSentTrace(ctx, cache.NewKeptTraceCacheEntry(trace), "", sp)
}

// if the span is sent for signaling expired traces,
// we should not add it to the cache
if sp.Data["meta.refinery.expired_trace"] != nil {
return
}

// great! trace is live. add the span.
trace.AddSpan(sp)
span.SetAttributes(attribute.String("disposition", "live_trace"))

var spanForwarded bool
// if this trace doesn't belong to us and it's not in sent state, we should forward a decision span to its decider
if !trace.Sent && !isMyTrace {
i.Metrics.Increment("incoming_router_peer")
i.Metrics.Increment(source + "_router_peer")
i.Logger.Debug().
WithString("peer", targetShard.GetAddress()).
Logf("Sending span to peer")

dc := i.createDecisionSpan(sp, trace, targetShard)
Expand Down Expand Up @@ -956,6 +970,8 @@ func (i *InMemCollector) send(ctx context.Context, trace *types.Trace, td *Trace
return
}
trace.Sent = true
_, span := otelutil.StartSpan(ctx, i.Tracer, "send")
defer span.End()

traceDur := i.Clock.Since(trace.ArrivalTime)
i.Metrics.Histogram("trace_duration_ms", float64(traceDur.Milliseconds()))
Expand Down Expand Up @@ -1374,6 +1390,7 @@ func (i *InMemCollector) processKeptDecision(msg string) {
i.Logger.Error().Logf("Failed to unmarshal trace decision message. %s", err)
return
}

toDelete := generics.NewSet[string]()
trace := i.cache.Get(td.TraceID)
// if we don't have the trace in the cache, we don't need to do anything
Expand All @@ -1391,13 +1408,12 @@ func (i *InMemCollector) processKeptDecision(msg string) {

i.cache.RemoveTraces(toDelete)
}
func (i *InMemCollector) makeDecision(trace *types.Trace, sendReason string) (*TraceDecision, error) {

func (i *InMemCollector) makeDecision(ctx context.Context, trace *types.Trace, sendReason string) (*TraceDecision, error) {
if trace.Sent {
return nil, errors.New("trace already sent")
}

ctx, span := otelutil.StartSpan(context.Background(), i.Tracer, "makeDecision")
ctx, span := otelutil.StartSpan(ctx, i.Tracer, "makeDecision")
defer span.End()
i.Metrics.Histogram("trace_span_count", float64(trace.DescendantCount()))

Expand Down Expand Up @@ -1482,6 +1498,14 @@ func (i *InMemCollector) IsMyTrace(traceID string) (sharder.Shard, bool) {
}

func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecision) {
start := time.Now()
defer func() {
i.Metrics.Histogram("collector_publish_trace_decision_dur_ms", time.Since(start).Milliseconds())
}()

_, span := otelutil.StartSpanWith(ctx, i.Tracer, "publishTraceDecision", "decision", td.Kept)
defer span.End()

var (
decisionMsg string
err error
Expand Down
Loading