Skip to content

Commit

Permalink
feat: add a limit to queue draining logic (#1441)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

The timer may never fire due to Go scheduler starving the timer
goroutine.
To prevent the timer being starved, this PR adds a limit to the number
of spans being processed during one iteration of the collect loop

## Short description of the changes

- add a limit to the draining logic
  • Loading branch information
VinozzZ authored Nov 20, 2024
1 parent fc9f0eb commit 9ff7bb2
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (i *InMemCollector) collect() {
span.End()
return
}
count := drainSpanQueue(ctx, sp, i.fromPeer, "peer", i.processSpan)
count := drainSpanQueue(ctx, sp, i.fromPeer, "peer", 1000, i.processSpan)
i.Metrics.Gauge("num_span_drained_from_peer", count)

case sp, ok := <-i.incoming:
Expand All @@ -428,7 +428,7 @@ func (i *InMemCollector) collect() {
span.End()
return
}
count := drainSpanQueue(ctx, sp, i.incoming, "incoming", i.processSpan)
count := drainSpanQueue(ctx, sp, i.incoming, "incoming", 1000, i.processSpan)
i.Metrics.Gauge("num_span_drained_from_incoming", count)
case msg, ok := <-i.dropDecisionMessages:
if !ok {
Expand Down Expand Up @@ -464,15 +464,23 @@ func (i *InMemCollector) collect() {
}
}

func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span, queueName string, processSpanFunc func(context.Context, *types.Span, string)) int {
func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span, queueName string, limit int, processSpanFunc func(context.Context, *types.Span, string)) int {
// process the original span
processSpanFunc(ctx, span, queueName)
count := 1

// let't try to process as many spans as we can in the next 100ms
// TODO: make timer configurable?
timer := time.NewTimer(time.Millisecond * 100)
defer timer.Stop()

for {

// if we've processed enough spans, we should return
if limit != 0 && count >= limit {
return count
}

select {
case <-timer.C:
// we've spent enough time processing spans
Expand Down Expand Up @@ -1568,6 +1576,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis

if td.Kept {
select {
case <-i.done:
case i.keptDecisionBuffer <- td:
default:
i.Metrics.Increment("collector_kept_decisions_queue_full")
Expand All @@ -1576,6 +1585,7 @@ func (i *InMemCollector) publishTraceDecision(ctx context.Context, td TraceDecis
return
} else {
select {
case <-i.done:
case i.dropDecisionBuffer <- td:
default:
i.Metrics.Increment("collector_drop_decisions_queue_full")
Expand Down

0 comments on commit 9ff7bb2

Please sign in to comment.