Skip to content

Commit

Permalink
feat: Try to drain incoming and peer queues for an amount of time (#1440
Browse files Browse the repository at this point in the history
)

## Which problem is this PR solving?

We're seeing that incoming and peer queues can sometimes build up and it
takes a long time to recover. A theory is that this is because the
collect loop has so many options, we should allow multiple messages to
be read from the incoming and peer queues whenever that it's selected to
be worked on.

## Short description of the changes
- Add a new utility func that can read messages from a given span
channel for up to 100 milliseconds, exiting early if the channel is
empty
- Update incoming and peer channels to use new func

---------

Co-authored-by: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com>
  • Loading branch information
MikeGoldsmith and VinozzZ authored Nov 19, 2024
1 parent fa1520c commit fc9f0eb
Showing 1 changed file with 65 additions and 44 deletions.
109 changes: 65 additions & 44 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ var inMemCollectorMetrics = []metrics.Metadata{
{Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "total number of kept decisions received"},
{Name: "collector_kept_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times kept trace decision queue is full"},
{Name: "collector_drop_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times drop trace decision queue is full"},
{Name: "num_span_drained_from_incoming", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans drained from incoming queue"},
{Name: "num_span_drained_from_peer", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans drained from peer queue"},
}

func (i *InMemCollector) Start() error {
Expand Down Expand Up @@ -417,58 +419,77 @@ func (i *InMemCollector) collect() {
span.End()
return
}
i.processSpan(ctx, sp, "peer")
default:
select {
case msg, ok := <-i.dropDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, dropDecision)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, keptDecision)
case <-ticker.C:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)

// maybe only do this if in test mode?
// Briefly unlock the cache, to allow test access.
_, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched")
i.mutex.Unlock()
runtime.Gosched()
i.mutex.Lock()
goSchedSpan.End()
case sp, ok := <-i.incoming:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processSpan(ctx, sp, "incoming")
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processSpan(ctx, sp, "peer")
case <-i.reload:
i.reloadConfigs()
count := drainSpanQueue(ctx, sp, i.fromPeer, "peer", i.processSpan)
i.Metrics.Gauge("num_span_drained_from_peer", count)

case sp, ok := <-i.incoming:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
count := drainSpanQueue(ctx, sp, i.incoming, "incoming", i.processSpan)
i.Metrics.Gauge("num_span_drained_from_incoming", count)
case msg, ok := <-i.dropDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, dropDecision)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, keptDecision)
case <-ticker.C:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)

// maybe only do this if in test mode?
// Briefly unlock the cache, to allow test access.
_, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched")
i.mutex.Unlock()
runtime.Gosched()
i.mutex.Lock()
goSchedSpan.End()
case <-i.reload:
i.reloadConfigs()
}

i.Metrics.Histogram("collector_collect_loop_duration_ms", float64(time.Now().Sub(startTime).Milliseconds()))
span.End()
}
}

func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span, queueName string, processSpanFunc func(context.Context, *types.Span, string)) int {
// process the original span
processSpanFunc(ctx, span, queueName)
count := 1

// let't try to process as many spans as we can in the next 100ms
// TODO: make timer configurable?
timer := time.NewTimer(time.Millisecond * 100)
for {
select {
case <-timer.C:
// we've spent enough time processing spans
return count
case sp, ok := <-ch:
if !ok {
return count
}
processSpanFunc(ctx, sp, queueName)
count++
default:
// nothing else in the channel
return count
}
}
}

func (i *InMemCollector) redistributeTraces(ctx context.Context) {
ctx, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces")
redistrubutionStartTime := i.Clock.Now()
Expand Down

0 comments on commit fc9f0eb

Please sign in to comment.