Skip to content

Commit

Permalink
fix: Use peer transmission during redistribute and shutdown events (#…
Browse files Browse the repository at this point in the history
…1332)

## Which problem is this PR solving?

Updates the collector use the configured peer transmission to send spans
to peers during redistribute and shutdown events. This ensures metrics
for sending to Honeycomb and peers are accurate.

The peer transmission is already configured and used by the router when
receiving spans that should be sent to a peer. This is used the same
transmission object after a span was originally accepted but then should
be forwarded to a peer.

- Closes #1324

## Short description of the changes
- Inject the already configured peer transmission during collector init
- Update redistribute and shutdown logic to use the peer transmission
- Update tests throughout to verify the peer transmission is used
correctly and also verify it can be injected correctly
- note: there were existing tests to verify redistribute and shutdown
behaviour and have been updated
  • Loading branch information
MikeGoldsmith authored Sep 16, 2024
1 parent 45874a2 commit 78f9ffb
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 51 deletions.
15 changes: 8 additions & 7 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ type InMemCollector struct {
Health health.Recorder `inject:""`
Sharder sharder.Sharder `inject:""`

Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
Peers peer.Peers `inject:""`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
PeerTransmission transmit.Transmission `inject:"peerTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
Peers peer.Peers `inject:""`

// For test use only
BlockOnAddSpan bool
Expand Down Expand Up @@ -431,7 +432,7 @@ func (i *InMemCollector) redistributeTraces() {
sp.Data["meta.refinery.forwarded"] = i.hostname
}

i.Transmission.EnqueueSpan(sp)
i.PeerTransmission.EnqueueSpan(sp)
}

forwardedTraces.Add(trace.TraceID)
Expand Down Expand Up @@ -1031,7 +1032,7 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <
sp.Data["meta.refinery.forwarded"] = i.hostname
}

i.Transmission.EnqueueSpan(sp)
i.PeerTransmission.EnqueueSpan(sp)
_, exist := forwardedTraces[sp.TraceID]
if !exist {
forwardedTraces[sp.TraceID] = struct{}{}
Expand Down
131 changes: 87 additions & 44 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newCache() (cache.TraceSentCache, error) {
return cache.NewCuckooSentCache(cfg, &metrics.NullMetrics{})
}

func newTestCollector(conf config.Config, transmission transmit.Transmission) *InMemCollector {
func newTestCollector(conf config.Config, transmission transmit.Transmission, peerTransmission transmit.Transmission) *InMemCollector {
s := &metrics.MockMetrics{}
s.Start()
clock := clockwork.NewRealClock()
Expand All @@ -50,14 +50,15 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I
healthReporter.Start()

return &InMemCollector{
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
PeerTransmission: peerTransmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
SamplerFactory: &sample.SamplerFactory{
Config: conf,
Metrics: s,
Expand Down Expand Up @@ -94,7 +95,9 @@ func TestAddRootSpan(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -176,7 +179,9 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -264,7 +269,9 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -329,7 +336,9 @@ func TestAddSpan(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -399,7 +408,9 @@ func TestDryRunMode(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

samplerFactory := &sample.SamplerFactory{
Config: conf,
Expand Down Expand Up @@ -538,7 +549,9 @@ func TestCacheSizeReload(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)
coll.Peers = &peer.MockPeers{}

err := coll.Start()
Expand Down Expand Up @@ -613,7 +626,9 @@ func TestSampleConfigReload(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

err := coll.Start()
assert.NoError(t, err)
Expand Down Expand Up @@ -684,7 +699,9 @@ func TestStableMaxAlloc(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

spandata := make([]map[string]interface{}, 500)
for i := 0; i < 500; i++ {
Expand Down Expand Up @@ -781,7 +798,9 @@ func TestAddSpanNoBlock(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -826,6 +845,7 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &health.Health{}},
&inject.Object{Value: &sharder.SingleServerSharder{}},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
&inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"},
Expand Down Expand Up @@ -862,7 +882,9 @@ func TestAddCountsToRoot(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -949,7 +971,9 @@ func TestLateRootGetsCounts(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1036,7 +1060,9 @@ func TestAddSpanCount(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1107,7 +1133,9 @@ func TestLateRootGetsSpanCount(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1182,7 +1210,9 @@ func TestLateSpanNotDecorated(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1249,7 +1279,9 @@ func TestAddAdditionalAttributes(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1313,7 +1345,9 @@ func TestStressReliefSampleRate(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

stc, err := newCache()
assert.NoError(t, err, "lru cache should start")
Expand Down Expand Up @@ -1400,7 +1434,9 @@ func TestStressReliefDecorateHostname(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

coll.hostname = "host123"
c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
Expand Down Expand Up @@ -1503,7 +1539,9 @@ func TestSpanWithRuleReasons(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1671,7 +1709,9 @@ func TestRedistributeTraces(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)
s := &sharder.MockSharder{
Self: &sharder.TestShard{Addr: "api1"},
}
Expand Down Expand Up @@ -1702,7 +1742,7 @@ func TestRedistributeTraces(t *testing.T) {

return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api1"
}, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue())
transmission.Flush()
peerTransmission.Flush()

s.Other = &sharder.TestShard{Addr: "api2"}
span = &types.Span{
Expand All @@ -1726,13 +1766,13 @@ func TestRedistributeTraces(t *testing.T) {
coll.Peers.RegisterUpdatedPeersCallback(coll.redistributeTimer.Reset)

assert.Eventually(t, func() bool {
transmission.Mux.Lock()
defer transmission.Mux.Unlock()
if len(transmission.Events) == 0 {
peerTransmission.Mux.Lock()
defer peerTransmission.Mux.Unlock()
if len(peerTransmission.Events) == 0 {
return false
}

return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api2"
return len(peerTransmission.Events) == 1 && peerTransmission.Events[0].APIHost == "api2"
}, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue())
}

Expand All @@ -1754,7 +1794,9 @@ func TestDrainTracesOnShutdown(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)
coll.hostname = "host123"
coll.Sharder = &sharder.MockSharder{
Self: &sharder.TestShard{Addr: "api1"},
Expand Down Expand Up @@ -1797,10 +1839,9 @@ func TestDrainTracesOnShutdown(t *testing.T) {
go coll.sendSpansOnShutdown(ctx1, sentTraceChan, forwardTraceChan)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
transmission.Mux.Lock()
events := transmission.Events
require.Len(collect, events, 1)
require.Equal(collect, span1.Dataset, events[0].Dataset)
transmission.Mux.Unlock()
defer transmission.Mux.Unlock()
require.Len(collect, transmission.Events, 1)
require.Equal(collect, span1.Dataset, transmission.Events[0].Dataset)
}, 2*time.Second, 100*time.Millisecond)

cancel1()
Expand All @@ -1824,11 +1865,11 @@ func TestDrainTracesOnShutdown(t *testing.T) {
ctx2, cancel2 := context.WithCancel(context.Background())
go coll.sendSpansOnShutdown(ctx2, sentTraceChan, forwardTraceChan)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
transmission.Mux.Lock()
require.Len(collect, transmission.Events, 1)
require.Equal(collect, span2.Dataset, transmission.Events[0].Dataset)
require.Equal(collect, "api2", transmission.Events[0].APIHost)
transmission.Mux.Unlock()
peerTransmission.Mux.Lock()
defer peerTransmission.Mux.Unlock()
require.Len(collect, peerTransmission.Events, 1)
require.Equal(collect, span2.Dataset, peerTransmission.Events[0].Dataset)
require.Equal(collect, "api2", peerTransmission.Events[0].APIHost)
}, 2*time.Second, 100*time.Millisecond)
cancel2()
}
Expand All @@ -1852,7 +1893,9 @@ func TestBigTracesGoEarly(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down

0 comments on commit 78f9ffb

Please sign in to comment.