From f8388c8940349eca0caebbeaace6818780b0fb6a Mon Sep 17 00:00:00 2001 From: Mike Goldsmith Date: Mon, 16 Sep 2024 17:06:28 +0100 Subject: [PATCH] fix: Use peer transmission during redistribute and shutdown events (#1332) ## Which problem is this PR solving? Updates the collector use the configured peer transmission to send spans to peers during redistribute and shutdown events. This ensures metrics for sending to Honeycomb and peers are accurate. The peer transmission is already configured and used by the router when receiving spans that should be sent to a peer. This is used the same transmission object after a span was originally accepted but then should be forwarded to a peer. - Closes #1324 ## Short description of the changes - Inject the already configured peer transmission during collector init - Update redistribute and shutdown logic to use the peer transmission - Update tests throughout to verify the peer transmission is used correctly and also verify it can be injected correctly - note: there were existing tests to verify redistribute and shutdown behaviour and have been updated --- collect/collect.go | 15 ++--- collect/collect_test.go | 131 ++++++++++++++++++++++++++-------------- 2 files changed, 95 insertions(+), 51 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 20002b8211..b88bbe18c1 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -67,11 +67,12 @@ type InMemCollector struct { Health health.Recorder `inject:""` Sharder sharder.Sharder `inject:""` - Transmission transmit.Transmission `inject:"upstreamTransmission"` - Metrics metrics.Metrics `inject:"genericMetrics"` - SamplerFactory *sample.SamplerFactory `inject:""` - StressRelief StressReliever `inject:"stressRelief"` - Peers peer.Peers `inject:""` + Transmission transmit.Transmission `inject:"upstreamTransmission"` + PeerTransmission transmit.Transmission `inject:"peerTransmission"` + Metrics metrics.Metrics `inject:"genericMetrics"` + SamplerFactory *sample.SamplerFactory `inject:""` + StressRelief StressReliever `inject:"stressRelief"` + Peers peer.Peers `inject:""` // For test use only BlockOnAddSpan bool @@ -433,7 +434,7 @@ func (i *InMemCollector) redistributeTraces() { sp.Data["meta.refinery.forwarded"] = i.hostname } - i.Transmission.EnqueueSpan(sp) + i.PeerTransmission.EnqueueSpan(sp) } forwardedTraces.Add(trace.TraceID) @@ -1033,7 +1034,7 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan < sp.Data["meta.refinery.forwarded"] = i.hostname } - i.Transmission.EnqueueSpan(sp) + i.PeerTransmission.EnqueueSpan(sp) _, exist := forwardedTraces[sp.TraceID] if !exist { forwardedTraces[sp.TraceID] = struct{}{} diff --git a/collect/collect_test.go b/collect/collect_test.go index fac99fa6c0..68ee4020f9 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -40,7 +40,7 @@ func newCache() (cache.TraceSentCache, error) { return cache.NewCuckooSentCache(cfg, &metrics.NullMetrics{}) } -func newTestCollector(conf config.Config, transmission transmit.Transmission) *InMemCollector { +func newTestCollector(conf config.Config, transmission transmit.Transmission, peerTransmission transmit.Transmission) *InMemCollector { s := &metrics.MockMetrics{} s.Start() clock := clockwork.NewRealClock() @@ -50,14 +50,15 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I healthReporter.Start() return &InMemCollector{ - Config: conf, - Clock: clock, - Logger: &logger.NullLogger{}, - Tracer: noop.NewTracerProvider().Tracer("test"), - Health: healthReporter, - Transmission: transmission, - Metrics: &metrics.NullMetrics{}, - StressRelief: &MockStressReliever{}, + Config: conf, + Clock: clock, + Logger: &logger.NullLogger{}, + Tracer: noop.NewTracerProvider().Tracer("test"), + Health: healthReporter, + Transmission: transmission, + PeerTransmission: peerTransmission, + Metrics: &metrics.NullMetrics{}, + StressRelief: &MockStressReliever{}, SamplerFactory: &sample.SamplerFactory{ Config: conf, Metrics: s, @@ -94,7 +95,9 @@ func TestAddRootSpan(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -176,7 +179,9 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -264,7 +269,9 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -329,7 +336,9 @@ func TestAddSpan(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -399,7 +408,9 @@ func TestDryRunMode(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) samplerFactory := &sample.SamplerFactory{ Config: conf, @@ -538,7 +549,9 @@ func TestCacheSizeReload(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) coll.Peers = &peer.MockPeers{} err := coll.Start() @@ -613,7 +626,9 @@ func TestSampleConfigReload(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) err := coll.Start() assert.NoError(t, err) @@ -684,7 +699,9 @@ func TestStableMaxAlloc(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) spandata := make([]map[string]interface{}, 500) for i := 0; i < 500; i++ { @@ -781,7 +798,9 @@ func TestAddSpanNoBlock(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -826,6 +845,7 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: &health.Health{}}, &inject.Object{Value: &sharder.SingleServerSharder{}}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"}, + &inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"}, &inject.Object{Value: &sample.SamplerFactory{}}, &inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"}, @@ -862,7 +882,9 @@ func TestAddCountsToRoot(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -949,7 +971,9 @@ func TestLateRootGetsCounts(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1036,7 +1060,9 @@ func TestAddSpanCount(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1107,7 +1133,9 @@ func TestLateRootGetsSpanCount(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1182,7 +1210,9 @@ func TestLateSpanNotDecorated(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1249,7 +1279,9 @@ func TestAddAdditionalAttributes(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1313,7 +1345,9 @@ func TestStressReliefSampleRate(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) stc, err := newCache() assert.NoError(t, err, "lru cache should start") @@ -1400,7 +1434,9 @@ func TestStressReliefDecorateHostname(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) coll.hostname = "host123" c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) @@ -1503,7 +1539,9 @@ func TestSpanWithRuleReasons(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1671,7 +1709,9 @@ func TestRedistributeTraces(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) s := &sharder.MockSharder{ Self: &sharder.TestShard{Addr: "api1"}, } @@ -1702,7 +1742,7 @@ func TestRedistributeTraces(t *testing.T) { return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api1" }, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue()) - transmission.Flush() + peerTransmission.Flush() s.Other = &sharder.TestShard{Addr: "api2"} span = &types.Span{ @@ -1726,13 +1766,13 @@ func TestRedistributeTraces(t *testing.T) { coll.Peers.RegisterUpdatedPeersCallback(coll.redistributeTimer.Reset) assert.Eventually(t, func() bool { - transmission.Mux.Lock() - defer transmission.Mux.Unlock() - if len(transmission.Events) == 0 { + peerTransmission.Mux.Lock() + defer peerTransmission.Mux.Unlock() + if len(peerTransmission.Events) == 0 { return false } - return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api2" + return len(peerTransmission.Events) == 1 && peerTransmission.Events[0].APIHost == "api2" }, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue()) } @@ -1754,7 +1794,9 @@ func TestDrainTracesOnShutdown(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) coll.hostname = "host123" coll.Sharder = &sharder.MockSharder{ Self: &sharder.TestShard{Addr: "api1"}, @@ -1797,10 +1839,9 @@ func TestDrainTracesOnShutdown(t *testing.T) { go coll.sendSpansOnShutdown(ctx1, sentTraceChan, forwardTraceChan) require.EventuallyWithT(t, func(collect *assert.CollectT) { transmission.Mux.Lock() - events := transmission.Events - require.Len(collect, events, 1) - require.Equal(collect, span1.Dataset, events[0].Dataset) - transmission.Mux.Unlock() + defer transmission.Mux.Unlock() + require.Len(collect, transmission.Events, 1) + require.Equal(collect, span1.Dataset, transmission.Events[0].Dataset) }, 2*time.Second, 100*time.Millisecond) cancel1() @@ -1824,11 +1865,11 @@ func TestDrainTracesOnShutdown(t *testing.T) { ctx2, cancel2 := context.WithCancel(context.Background()) go coll.sendSpansOnShutdown(ctx2, sentTraceChan, forwardTraceChan) require.EventuallyWithT(t, func(collect *assert.CollectT) { - transmission.Mux.Lock() - require.Len(collect, transmission.Events, 1) - require.Equal(collect, span2.Dataset, transmission.Events[0].Dataset) - require.Equal(collect, "api2", transmission.Events[0].APIHost) - transmission.Mux.Unlock() + peerTransmission.Mux.Lock() + defer peerTransmission.Mux.Unlock() + require.Len(collect, peerTransmission.Events, 1) + require.Equal(collect, span2.Dataset, peerTransmission.Events[0].Dataset) + require.Equal(collect, "api2", peerTransmission.Events[0].APIHost) }, 2*time.Second, 100*time.Millisecond) cancel2() } @@ -1852,7 +1893,9 @@ func TestBigTracesGoEarly(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c