Skip to content

Commit

Permalink
fix: allow draining traces when only 1 peer left
Browse files Browse the repository at this point in the history
  • Loading branch information
VinozzZ committed Aug 15, 2024
1 parent 244e0e4 commit 26dbbe5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
37 changes: 25 additions & 12 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ func (i *InMemCollector) Start() error {
i.Metrics.Register("trace_send_has_root", "counter")
i.Metrics.Register("trace_send_no_root", "counter")
i.Metrics.Register("trace_forwarded_on_peer_change", "gauge")
i.Metrics.Register("trace_send_on_shutdown", "gauge")
i.Metrics.Register("trace_forwarded_on_shutdown", "gauge")
i.Metrics.Register("trace_send_on_shutdown", "counter")
i.Metrics.Register("trace_forwarded_on_shutdown", "counter")
i.Metrics.Register("trace_redistribution_count", "counter")

i.Metrics.Register(TraceSendGotRoot, "counter")
i.Metrics.Register(TraceSendExpired, "counter")
Expand All @@ -152,7 +153,7 @@ func (i *InMemCollector) Start() error {
i.done = make(chan struct{})
i.datasetSamplers = make(map[string]sample.Sampler)
i.done = make(chan struct{})
i.redistributeTimer = newRedistributeNotifier(i.Logger, i.Clock)
i.redistributeTimer = newRedistributeNotifier(i.Logger, i.Metrics, i.Clock)

if i.Config.GetAddHostMetadataToTrace() {
if hostname, err := os.Hostname(); err == nil && hostname != "" {
Expand Down Expand Up @@ -401,7 +402,7 @@ func (i *InMemCollector) redistributeTraces() {
return
}
numOfPeers := len(peers)
if numOfPeers <= 1 {
if numOfPeers == 0 {
return
}

Expand Down Expand Up @@ -856,7 +857,7 @@ func (i *InMemCollector) Stop() error {
if err != nil {
i.Logger.Error().Logf("unable to get peer list with error %s", err.Error())
}
if len(peers) > 1 {
if len(peers) > 0 {
i.sendTracesOnShutdown()
}
}
Expand Down Expand Up @@ -979,10 +980,6 @@ func (i *InMemCollector) distributeSpansOnShutdown(sentSpanChan chan sentRecord,
func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <-chan sentRecord, forwardSpanChan <-chan *types.Span) {
sentTraces := make(map[string]struct{})
forwardedTraces := make(map[string]struct{})
defer func() {
i.Metrics.Gauge("trace_send_on_shutdown", len(sentTraces))
i.Metrics.Gauge("trace_forwarded_on_shutdown", len(forwardedTraces))
}()

for {
select {
Expand All @@ -999,7 +996,12 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <
r.span.Data["meta.refinery.shutdown.send"] = true

i.dealWithSentTrace(ctx, r.record, r.reason, r.span)
sentTraces[r.span.TraceID] = struct{}{}
_, exist := sentTraces[r.span.TraceID]
if !exist {
sentTraces[r.span.TraceID] = struct{}{}
i.Metrics.Count("trace_send_on_shutdown", 1)

}

span.End()

Expand Down Expand Up @@ -1030,7 +1032,13 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <
}

i.Transmission.EnqueueSpan(sp)
forwardedTraces[sp.TraceID] = struct{}{}
_, exist := forwardedTraces[sp.TraceID]
if !exist {
forwardedTraces[sp.TraceID] = struct{}{}
i.Metrics.Count("trace_forwarded_on_shutdown", 1)

}

span.End()
}

Expand All @@ -1050,17 +1058,19 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) {
}
}

func newRedistributeNotifier(logger logger.Logger, clock clockwork.Clock) *redistributeNotifier {
func newRedistributeNotifier(logger logger.Logger, metrics metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: 30 * time.Second,
maxAttempts: 5,
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: metrics,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}
r.metrics.Register("trace_redistribution_count", "gauge")

return r
}
Expand All @@ -1071,6 +1081,7 @@ type redistributeNotifier struct {
initialDelay time.Duration
maxAttempts int
maxDelay time.Duration
metrics metrics.Metrics

reset chan struct{}
done chan struct{}
Expand Down Expand Up @@ -1113,6 +1124,7 @@ func (r *redistributeNotifier) run() {
// if we've reached the max attempts, reset the backoff and attempts
// only when the reset signal is received.
if attempts >= r.maxAttempts {
r.metrics.Gauge("trace_redistribution_count", 0)
<-r.reset
lastBackoff = r.initialDelay
attempts = 0
Expand All @@ -1124,6 +1136,7 @@ func (r *redistributeNotifier) run() {
}

attempts++
r.metrics.Gauge("trace_redistribution_count", attempts)

// Calculate the backoff interval using exponential backoff with a base time.
backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay)))
Expand Down
2 changes: 1 addition & 1 deletion collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I
Addr: "api1",
},
},
redistributeTimer: newRedistributeNotifier(&logger.NullLogger{}, clock),
redistributeTimer: newRedistributeNotifier(&logger.NullLogger{}, &metrics.NullMetrics{}, clock),
}
}

Expand Down

0 comments on commit 26dbbe5

Please sign in to comment.