Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use peer transmission during redistribute and shutdown events #1332

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ type InMemCollector struct {
Health health.Recorder `inject:""`
Sharder sharder.Sharder `inject:""`

Transmission transmit.Transmission `inject:"upstreamTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
Peers peer.Peers `inject:""`
Transmission transmit.Transmission `inject:"upstreamTransmission"`
PeerTransmission transmit.Transmission `inject:"peerTransmission"`
Metrics metrics.Metrics `inject:"genericMetrics"`
SamplerFactory *sample.SamplerFactory `inject:""`
StressRelief StressReliever `inject:"stressRelief"`
Peers peer.Peers `inject:""`

// For test use only
BlockOnAddSpan bool
Expand Down Expand Up @@ -431,7 +432,7 @@ func (i *InMemCollector) redistributeTraces() {
sp.Data["meta.refinery.forwarded"] = i.hostname
}

i.Transmission.EnqueueSpan(sp)
i.PeerTransmission.EnqueueSpan(sp)
}

forwardedTraces.Add(trace.TraceID)
Expand Down Expand Up @@ -1031,7 +1032,7 @@ func (i *InMemCollector) sendSpansOnShutdown(ctx context.Context, sentSpanChan <
sp.Data["meta.refinery.forwarded"] = i.hostname
}

i.Transmission.EnqueueSpan(sp)
i.PeerTransmission.EnqueueSpan(sp)
_, exist := forwardedTraces[sp.TraceID]
if !exist {
forwardedTraces[sp.TraceID] = struct{}{}
Expand Down
131 changes: 87 additions & 44 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newCache() (cache.TraceSentCache, error) {
return cache.NewCuckooSentCache(cfg, &metrics.NullMetrics{})
}

func newTestCollector(conf config.Config, transmission transmit.Transmission) *InMemCollector {
func newTestCollector(conf config.Config, transmission transmit.Transmission, peerTransmission transmit.Transmission) *InMemCollector {
s := &metrics.MockMetrics{}
s.Start()
clock := clockwork.NewRealClock()
Expand All @@ -50,14 +50,15 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission) *I
healthReporter.Start()

return &InMemCollector{
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
Config: conf,
Clock: clock,
Logger: &logger.NullLogger{},
Tracer: noop.NewTracerProvider().Tracer("test"),
Health: healthReporter,
Transmission: transmission,
PeerTransmission: peerTransmission,
Metrics: &metrics.NullMetrics{},
StressRelief: &MockStressReliever{},
SamplerFactory: &sample.SamplerFactory{
Config: conf,
Metrics: s,
Expand Down Expand Up @@ -94,7 +95,9 @@ func TestAddRootSpan(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -176,7 +179,9 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -264,7 +269,9 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -329,7 +336,9 @@ func TestAddSpan(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -399,7 +408,9 @@ func TestDryRunMode(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

samplerFactory := &sample.SamplerFactory{
Config: conf,
Expand Down Expand Up @@ -538,7 +549,9 @@ func TestCacheSizeReload(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)
coll.Peers = &peer.MockPeers{}

err := coll.Start()
Expand Down Expand Up @@ -613,7 +626,9 @@ func TestSampleConfigReload(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

err := coll.Start()
assert.NoError(t, err)
Expand Down Expand Up @@ -684,7 +699,9 @@ func TestStableMaxAlloc(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

spandata := make([]map[string]interface{}, 500)
for i := 0; i < 500; i++ {
Expand Down Expand Up @@ -781,7 +798,9 @@ func TestAddSpanNoBlock(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(10, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -826,6 +845,7 @@ func TestDependencyInjection(t *testing.T) {
&inject.Object{Value: &health.Health{}},
&inject.Object{Value: &sharder.SingleServerSharder{}},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "upstreamTransmission"},
&inject.Object{Value: &transmit.MockTransmission{}, Name: "peerTransmission"},
&inject.Object{Value: &metrics.NullMetrics{}, Name: "genericMetrics"},
&inject.Object{Value: &sample.SamplerFactory{}},
&inject.Object{Value: &MockStressReliever{}, Name: "stressRelief"},
Expand Down Expand Up @@ -862,7 +882,9 @@ func TestAddCountsToRoot(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -949,7 +971,9 @@ func TestLateRootGetsCounts(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1036,7 +1060,9 @@ func TestAddSpanCount(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1107,7 +1133,9 @@ func TestLateRootGetsSpanCount(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1182,7 +1210,9 @@ func TestLateSpanNotDecorated(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1249,7 +1279,9 @@ func TestAddAdditionalAttributes(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1313,7 +1345,9 @@ func TestStressReliefSampleRate(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

stc, err := newCache()
assert.NoError(t, err, "lru cache should start")
Expand Down Expand Up @@ -1400,7 +1434,9 @@ func TestStressReliefDecorateHostname(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

coll.hostname = "host123"
c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
Expand Down Expand Up @@ -1503,7 +1539,9 @@ func TestSpanWithRuleReasons(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down Expand Up @@ -1671,7 +1709,9 @@ func TestRedistributeTraces(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)
s := &sharder.MockSharder{
Self: &sharder.TestShard{Addr: "api1"},
}
Expand Down Expand Up @@ -1702,7 +1742,7 @@ func TestRedistributeTraces(t *testing.T) {

return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api1"
}, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue())
transmission.Flush()
peerTransmission.Flush()

s.Other = &sharder.TestShard{Addr: "api2"}
span = &types.Span{
Expand All @@ -1726,13 +1766,13 @@ func TestRedistributeTraces(t *testing.T) {
coll.Peers.RegisterUpdatedPeersCallback(coll.redistributeTimer.Reset)

assert.Eventually(t, func() bool {
transmission.Mux.Lock()
defer transmission.Mux.Unlock()
if len(transmission.Events) == 0 {
peerTransmission.Mux.Lock()
defer peerTransmission.Mux.Unlock()
if len(peerTransmission.Events) == 0 {
return false
}

return len(transmission.Events) == 1 && transmission.Events[0].APIHost == "api2"
return len(peerTransmission.Events) == 1 && peerTransmission.Events[0].APIHost == "api2"
}, conf.GetTracesConfig().GetTraceTimeout()*2, conf.GetTracesConfig().GetSendTickerValue())
}

Expand All @@ -1754,7 +1794,9 @@ func TestDrainTracesOnShutdown(t *testing.T) {
}
transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)
coll.hostname = "host123"
coll.Sharder = &sharder.MockSharder{
Self: &sharder.TestShard{Addr: "api1"},
Expand Down Expand Up @@ -1797,10 +1839,9 @@ func TestDrainTracesOnShutdown(t *testing.T) {
go coll.sendSpansOnShutdown(ctx1, sentTraceChan, forwardTraceChan)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
transmission.Mux.Lock()
events := transmission.Events
require.Len(collect, events, 1)
require.Equal(collect, span1.Dataset, events[0].Dataset)
transmission.Mux.Unlock()
defer transmission.Mux.Unlock()
require.Len(collect, transmission.Events, 1)
require.Equal(collect, span1.Dataset, transmission.Events[0].Dataset)
}, 2*time.Second, 100*time.Millisecond)

cancel1()
Expand All @@ -1824,11 +1865,11 @@ func TestDrainTracesOnShutdown(t *testing.T) {
ctx2, cancel2 := context.WithCancel(context.Background())
go coll.sendSpansOnShutdown(ctx2, sentTraceChan, forwardTraceChan)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
transmission.Mux.Lock()
require.Len(collect, transmission.Events, 1)
require.Equal(collect, span2.Dataset, transmission.Events[0].Dataset)
require.Equal(collect, "api2", transmission.Events[0].APIHost)
transmission.Mux.Unlock()
peerTransmission.Mux.Lock()
defer peerTransmission.Mux.Unlock()
require.Len(collect, peerTransmission.Events, 1)
require.Equal(collect, span2.Dataset, peerTransmission.Events[0].Dataset)
require.Equal(collect, "api2", peerTransmission.Events[0].APIHost)
}, 2*time.Second, 100*time.Millisecond)
cancel2()
}
Expand All @@ -1852,7 +1893,9 @@ func TestBigTracesGoEarly(t *testing.T) {

transmission := &transmit.MockTransmission{}
transmission.Start()
coll := newTestCollector(conf, transmission)
peerTransmission := &transmit.MockTransmission{}
peerTransmission.Start()
coll := newTestCollector(conf, transmission, peerTransmission)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
Expand Down
Loading