From b3ec4e91a879e7385411f2efcc3d56714139735a Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Mon, 16 Sep 2024 14:31:16 +0100 Subject: [PATCH] feat: Use peer transmission during redistribution and shutdown --- collect/collect.go | 15 ++--- collect/collect_test.go | 131 ++++++++++++++++++++++++++-------------- 2 files changed, 95 insertions(+), 51 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index fcd6430015..2f9b045a47 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -67,11 +67,12 @@ type InMemCollector struct { Health health.Recorder `inject:""` Sharder sharder.Sharder `inject:""` - Transmission transmit.Transmission `inject:"upstreamTransmission"` - Metrics metrics.Metrics `inject:"genericMetrics"` - SamplerFactory *sample.SamplerFactory `inject:""` - StressRelief StressReliever `inject:"stressRelief"` - Peers peer.Peers `inject:""` + Transmission transmit.Transmission `inject:"upstreamTransmission"` + PeerTransmission transmit.Transmission `inject:"peerTransmission"` + Metrics metrics.Metrics `inject:"genericMetrics"` + SamplerFactory *sample.SamplerFactory `inject:""` + StressRelief StressReliever `inject:"stressRelief"` + Peers peer.Peers `inject:""` // For test use only BlockOnAddSpan bool @@ -431,7 +432,7 @@ func (i *InMemCollector) redistributeTraces() { sp.Data["meta.refinery.forwarded"] = i.hostname } - i.Transmission.EnqueueSpan(sp) + i.PeerTransmission.EnqueueSpan(sp) } forwardedTraces.Add(trace.TraceID) @@ -1031,7 +1032,7 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan < sp.Data["meta.refinery.forwarded"] = i.hostname } - i.Transmission.EnqueueSpan(sp) + i.PeerTransmission.EnqueueSpan(sp) _, exist := forwardedTraces[sp.TraceID] if !exist { forwardedTraces[sp.TraceID] = struct{}{} diff --git a/collect/collect_test.go b/collect/collect_test.go index fac99fa6c0..68ee4020f9 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -40,7 +40,7 @@ func newCache() (cache.TraceSentCache, error) { return cache.NewCuckooSentCache(cfg, &metrics.NullMetrics{}) } -func newTestCollector(conf config.Config, transmission transmit.Transmission) *InMemCollector { +func newTestCollector(conf config.Config, transmission transmit.Transmission, peerTransmission transmit.Transmission) *InMemCollector { s := &metrics.MockMetrics{} s.Start() clock := clockwork.NewRealClock() @@ -50,14 +50,15 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I healthReporter.Start() return &InMemCollector{ - Config: conf, - Clock: clock, - Logger: &logger.NullLogger{}, - Tracer: noop.NewTracerProvider().Tracer("test"), - Health: healthReporter, - Transmission: transmission, - Metrics: &metrics.NullMetrics{}, - StressRelief: &MockStressReliever{}, + Config: conf, + Clock: clock, + Logger: &logger.NullLogger{}, + Tracer: noop.NewTracerProvider().Tracer("test"), + Health: healthReporter, + Transmission: transmission, + PeerTransmission: peerTransmission, + Metrics: &metrics.NullMetrics{}, + StressRelief: &MockStressReliever{}, SamplerFactory: &sample.SamplerFactory{ Config: conf, Metrics: s, @@ -94,7 +95,9 @@ func TestAddRootSpan(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -176,7 +179,9 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -264,7 +269,9 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -329,7 +336,9 @@ func TestAddSpan(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -399,7 +408,9 @@ func TestDryRunMode(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) samplerFactory := &sample.SamplerFactory{ Config: conf, @@ -538,7 +549,9 @@ func TestCacheSizeReload(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) coll.Peers = &peer.MockPeers{} err := coll.Start() @@ -613,7 +626,9 @@ func TestSampleConfigReload(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) err := coll.Start() assert.NoError(t, err) @@ -684,7 +699,9 @@ func TestStableMaxAlloc(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) spandata := make([]map[string]interface{}, 500) for i := 0; i < 500; i++ { @@ -781,7 +798,9 @@ func TestAddSpanNoBlock(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -826,6 +845,7 @@ func TestDependencyInjection(t *testing.T) { &inject.Object{Value: &health.Health{}}, &inject.Object{Value: &sharder.SingleServerSharder{}}, &inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"}, + &inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"}, &inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"}, &inject.Object{Value: &sample.SamplerFactory{}}, &inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"}, @@ -862,7 +882,9 @@ func TestAddCountsToRoot(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -949,7 +971,9 @@ func TestLateRootGetsCounts(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1036,7 +1060,9 @@ func TestAddSpanCount(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1107,7 +1133,9 @@ func TestLateRootGetsSpanCount(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1182,7 +1210,9 @@ func TestLateSpanNotDecorated(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1249,7 +1279,9 @@ func TestAddAdditionalAttributes(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1313,7 +1345,9 @@ func TestStressReliefSampleRate(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) stc, err := newCache() assert.NoError(t, err, "lru cache should start") @@ -1400,7 +1434,9 @@ func TestStressReliefDecorateHostname(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) coll.hostname = "host123" c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) @@ -1503,7 +1539,9 @@ func TestSpanWithRuleReasons(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c @@ -1671,7 +1709,9 @@ func TestRedistributeTraces(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) s := &sharder.MockSharder{ Self: &sharder.TestShard{Addr: "api1"}, } @@ -1702,7 +1742,7 @@ func TestRedistributeTraces(t *testing.T) { return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api1" }, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue()) - transmission.Flush() + peerTransmission.Flush() s.Other = &sharder.TestShard{Addr: "api2"} span = &types.Span{ @@ -1726,13 +1766,13 @@ func TestRedistributeTraces(t *testing.T) { coll.Peers.RegisterUpdatedPeersCallback(coll.redistributeTimer.Reset) assert.Eventually(t, func() bool { - transmission.Mux.Lock() - defer transmission.Mux.Unlock() - if len(transmission.Events) == 0 { + peerTransmission.Mux.Lock() + defer peerTransmission.Mux.Unlock() + if len(peerTransmission.Events) == 0 { return false } - return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api2" + return len(peerTransmission.Events) == 1 && peerTransmission.Events[0].APIHost == "api2" }, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue()) } @@ -1754,7 +1794,9 @@ func TestDrainTracesOnShutdown(t *testing.T) { } transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) coll.hostname = "host123" coll.Sharder = &sharder.MockSharder{ Self: &sharder.TestShard{Addr: "api1"}, @@ -1797,10 +1839,9 @@ func TestDrainTracesOnShutdown(t *testing.T) { go coll.sendSpansOnShutdown(ctx1, sentTraceChan, forwardTraceChan) require.EventuallyWithT(t, func(collect *assert.CollectT) { transmission.Mux.Lock() - events := transmission.Events - require.Len(collect, events, 1) - require.Equal(collect, span1.Dataset, events[0].Dataset) - transmission.Mux.Unlock() + defer transmission.Mux.Unlock() + require.Len(collect, transmission.Events, 1) + require.Equal(collect, span1.Dataset, transmission.Events[0].Dataset) }, 2*time.Second, 100*time.Millisecond) cancel1() @@ -1824,11 +1865,11 @@ func TestDrainTracesOnShutdown(t *testing.T) { ctx2, cancel2 := context.WithCancel(context.Background()) go coll.sendSpansOnShutdown(ctx2, sentTraceChan, forwardTraceChan) require.EventuallyWithT(t, func(collect *assert.CollectT) { - transmission.Mux.Lock() - require.Len(collect, transmission.Events, 1) - require.Equal(collect, span2.Dataset, transmission.Events[0].Dataset) - require.Equal(collect, "api2", transmission.Events[0].APIHost) - transmission.Mux.Unlock() + peerTransmission.Mux.Lock() + defer peerTransmission.Mux.Unlock() + require.Len(collect, peerTransmission.Events, 1) + require.Equal(collect, span2.Dataset, peerTransmission.Events[0].Dataset) + require.Equal(collect, "api2", peerTransmission.Events[0].APIHost) }, 2*time.Second, 100*time.Millisecond) cancel2() } @@ -1852,7 +1893,9 @@ func TestBigTracesGoEarly(t *testing.T) { transmission := &transmit.MockTransmission{} transmission.Start() - coll := newTestCollector(conf, transmission) + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{}) coll.cache = c