Skip to content

Commit

Permalink
fix: revert draining logic for incoming and peer queue
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Nov 20, 2024
1 parent 9ff7bb2 commit 1962b8e
Showing 1 changed file with 44 additions and 73 deletions.
117 changes: 44 additions & 73 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ var inMemCollectorMetrics = []metrics.Metadata{
{Name: "kept_decisions_received", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "total number of kept decisions received"},
{Name: "collector_kept_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times kept trace decision queue is full"},
{Name: "collector_drop_decisions_queue_full", Type: metrics.Counter, Unit: metrics.Dimensionless, Description: "number of times drop trace decision queue is full"},
{Name: "num_span_drained_from_incoming", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans drained from incoming queue"},
{Name: "num_span_drained_from_peer", Type: metrics.Gauge, Unit: metrics.Dimensionless, Description: "number of spans drained from peer queue"},
}

func (i *InMemCollector) Start() error {
Expand Down Expand Up @@ -419,85 +417,58 @@ func (i *InMemCollector) collect() {
span.End()
return
}
count := drainSpanQueue(ctx, sp, i.fromPeer, "peer", 1000, i.processSpan)
i.Metrics.Gauge("num_span_drained_from_peer", count)

case sp, ok := <-i.incoming:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
count := drainSpanQueue(ctx, sp, i.incoming, "incoming", 1000, i.processSpan)
i.Metrics.Gauge("num_span_drained_from_incoming", count)
case msg, ok := <-i.dropDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, dropDecision)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
i.processSpan(ctx, sp, "peer")
default:
select {
case msg, ok := <-i.dropDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, dropDecision)
case msg, ok := <-i.keptDecisionMessages:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processTraceDecisions(msg, keptDecision)
case <-ticker.C:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)

// maybe only do this if in test mode?
// Briefly unlock the cache, to allow test access.
_, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched")
i.mutex.Unlock()
runtime.Gosched()
i.mutex.Lock()
goSchedSpan.End()
case sp, ok := <-i.incoming:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processSpan(ctx, sp, "incoming")
case sp, ok := <-i.fromPeer:
if !ok {
// channel's been closed; we should shut down.
span.End()
return
}
i.processSpan(ctx, sp, "peer")
case <-i.reload:
i.reloadConfigs()
}
i.processTraceDecisions(msg, keptDecision)
case <-ticker.C:
i.sendExpiredTracesInCache(ctx, i.Clock.Now())
i.checkAlloc(ctx)

// maybe only do this if in test mode?
// Briefly unlock the cache, to allow test access.
_, goSchedSpan := otelutil.StartSpan(ctx, i.Tracer, "Gosched")
i.mutex.Unlock()
runtime.Gosched()
i.mutex.Lock()
goSchedSpan.End()
case <-i.reload:
i.reloadConfigs()
}

i.Metrics.Histogram("collector_collect_loop_duration_ms", float64(time.Now().Sub(startTime).Milliseconds()))
span.End()
}
}

func drainSpanQueue(ctx context.Context, span *types.Span, ch <-chan *types.Span, queueName string, limit int, processSpanFunc func(context.Context, *types.Span, string)) int {
// process the original span
processSpanFunc(ctx, span, queueName)
count := 1

// let't try to process as many spans as we can in the next 100ms
// TODO: make timer configurable?
timer := time.NewTimer(time.Millisecond * 100)
defer timer.Stop()

for {

// if we've processed enough spans, we should return
if limit != 0 && count >= limit {
return count
}

select {
case <-timer.C:
// we've spent enough time processing spans
return count
case sp, ok := <-ch:
if !ok {
return count
}
processSpanFunc(ctx, sp, queueName)
count++
default:
// nothing else in the channel
return count
}
}
}

func (i *InMemCollector) redistributeTraces(ctx context.Context) {
ctx, span := otelutil.StartSpan(ctx, i.Tracer, "redistributeTraces")
redistrubutionStartTime := i.Clock.Now()
Expand Down

0 comments on commit 1962b8e

Please sign in to comment.