From 26dbbe505d02f147cbdcf2edd96457435b63611b Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:00:50 -0400 Subject: [PATCH 1/2] fix: allow draining traces when only 1 peer left --- collect/collect.go | 37 +++++++++++++++++++++++++------------ collect/collect_test.go | 2 +- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 8a843f4d71..8b632b319e 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -124,8 +124,9 @@ func (i *InMemCollector) Start() error { i.Metrics.Register("trace_send_has_root", "counter") i.Metrics.Register("trace_send_no_root", "counter") i.Metrics.Register("trace_forwarded_on_peer_change", "gauge") - i.Metrics.Register("trace_send_on_shutdown", "gauge") - i.Metrics.Register("trace_forwarded_on_shutdown", "gauge") + i.Metrics.Register("trace_send_on_shutdown", "counter") + i.Metrics.Register("trace_forwarded_on_shutdown", "counter") + i.Metrics.Register("trace_redistribution_count", "counter") i.Metrics.Register(TraceSendGotRoot, "counter") i.Metrics.Register(TraceSendExpired, "counter") @@ -152,7 +153,7 @@ func (i *InMemCollector) Start() error { i.done = make(chan struct{}) i.datasetSamplers = make(map[string]sample.Sampler) i.done = make(chan struct{}) - i.redistributeTimer = newRedistributeNotifier(i.Logger, i.Clock) + i.redistributeTimer = newRedistributeNotifier(i.Logger, i.Metrics, i.Clock) if i.Config.GetAddHostMetadataToTrace() { if hostname, err := os.Hostname(); err == nil && hostname != "" { @@ -401,7 +402,7 @@ func (i *InMemCollector) redistributeTraces() { return } numOfPeers := len(peers) - if numOfPeers <= 1 { + if numOfPeers == 0 { return } @@ -856,7 +857,7 @@ func (i *InMemCollector) Stop() error { if err != nil { i.Logger.Error().Logf("unable to get peer list with error %s", err.Error()) } - if len(peers) > 1 { + if len(peers) > 0 { i.sendTracesOnShutdown() } } @@ -979,10 +980,6 @@ func (i *InMemCollector) distributeSpansOnShutdown(sentSpanChan chan sentRecord, func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <-chan sentRecord, forwardSpanChan <-chan *types.Span) { sentTraces := make(map[string]struct{}) forwardedTraces := make(map[string]struct{}) - defer func() { - i.Metrics.Gauge("trace_send_on_shutdown", len(sentTraces)) - i.Metrics.Gauge("trace_forwarded_on_shutdown", len(forwardedTraces)) - }() for { select { @@ -999,7 +996,12 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan < r.span.Data["meta.refinery.shutdown.send"] = true i.dealWithSentTrace(ctx, r.record, r.reason, r.span) - sentTraces[r.span.TraceID] = struct{}{} + _, exist := sentTraces[r.span.TraceID] + if !exist { + sentTraces[r.span.TraceID] = struct{}{} + i.Metrics.Count("trace_send_on_shutdown", 1) + + } span.End() @@ -1030,7 +1032,13 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan < } i.Transmission.EnqueueSpan(sp) - forwardedTraces[sp.TraceID] = struct{}{} + _, exist := forwardedTraces[sp.TraceID] + if !exist { + forwardedTraces[sp.TraceID] = struct{}{} + i.Metrics.Count("trace_forwarded_on_shutdown", 1) + + } + span.End() } @@ -1050,7 +1058,7 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) { } } -func newRedistributeNotifier(logger logger.Logger, clock clockwork.Clock) *redistributeNotifier { +func newRedistributeNotifier(logger logger.Logger, metrics metrics.Metrics, clock clockwork.Clock) *redistributeNotifier { r := &redistributeNotifier{ initialDelay: 3 * time.Second, maxDelay: 30 * time.Second, @@ -1058,9 +1066,11 @@ func newRedistributeNotifier(logger logger.Logger, clock clockwork.Clock) *redis done: make(chan struct{}), clock: clock, logger: logger, + metrics: metrics, triggered: make(chan struct{}), reset: make(chan struct{}), } + r.metrics.Register("trace_redistribution_count", "gauge") return r } @@ -1071,6 +1081,7 @@ type redistributeNotifier struct { initialDelay time.Duration maxAttempts int maxDelay time.Duration + metrics metrics.Metrics reset chan struct{} done chan struct{} @@ -1113,6 +1124,7 @@ func (r *redistributeNotifier) run() { // if we've reached the max attempts, reset the backoff and attempts // only when the reset signal is received. if attempts >= r.maxAttempts { + r.metrics.Gauge("trace_redistribution_count", 0) <-r.reset lastBackoff = r.initialDelay attempts = 0 @@ -1124,6 +1136,7 @@ func (r *redistributeNotifier) run() { } attempts++ + r.metrics.Gauge("trace_redistribution_count", attempts) // Calculate the backoff interval using exponential backoff with a base time. backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay))) diff --git a/collect/collect_test.go b/collect/collect_test.go index e9541b2c7c..79349ffbc9 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -72,7 +72,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I Addr: "api1", }, }, - redistributeTimer: newRedistributeNotifier(&logger.NullLogger{}, clock), + redistributeTimer: newRedistributeNotifier(&logger.NullLogger{}, &metrics.NullMetrics{}, clock), } } From 1cca0a757b041abb0f9ab05418c0677385984ed0 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:22:13 -0400 Subject: [PATCH 2/2] remove unused code --- collect/collect.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collect/collect.go b/collect/collect.go index 8b632b319e..2639246636 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -124,9 +124,9 @@ func (i *InMemCollector) Start() error { i.Metrics.Register("trace_send_has_root", "counter") i.Metrics.Register("trace_send_no_root", "counter") i.Metrics.Register("trace_forwarded_on_peer_change", "gauge") + i.Metrics.Register("trace_redistribution_count", "gauge") i.Metrics.Register("trace_send_on_shutdown", "counter") i.Metrics.Register("trace_forwarded_on_shutdown", "counter") - i.Metrics.Register("trace_redistribution_count", "counter") i.Metrics.Register(TraceSendGotRoot, "counter") i.Metrics.Register(TraceSendExpired, "counter")