From 4a78150b1e20e89b1a9e57e86ee6028d4e29936a Mon Sep 17 00:00:00 2001 From: Chris Toshok Date: Fri, 17 Apr 2020 17:41:33 -0700 Subject: [PATCH 1/2] it's possible for multiple goroutines to end up in this particular switch arm at the same time. let's make sure that even if that happens they can't both try to close the channel or send the trace. --- collect/collect.go | 22 ++++++++++++---------- types/event.go | 3 +++ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index 6afcb0143a..c7fbbb03b5 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -402,16 +402,18 @@ func (i *InMemCollector) sendAfterRootDelay(trace *types.Trace) { func (i *InMemCollector) pauseAndSend(pause time.Duration, trace *types.Trace) { select { case <-time.After(pause): - // TODO fix FinishTime to be the time of the last span + its duration rather - // than whenever the timer goes off. - trace.FinishTime = time.Now() - // close the channel so all other timers expire - close(trace.CancelSending) - i.Logger. - WithField("trace_id", trace.TraceID). - WithField("pause_dur", pause). - Debugf("pauseAndSend wait finished; sending trace.") - i.toSend <- &sendSignal{trace} + trace.SendOnce.Do(func() { + // TODO fix FinishTime to be the time of the last span + its duration rather + // than whenever the timer goes off. + trace.FinishTime = time.Now() + // close the channel so all other timers expire + close(trace.CancelSending) + i.Logger. + WithField("trace_id", trace.TraceID). + WithField("pause_dur", pause). + Debugf("pauseAndSend wait finished; sending trace.") + i.toSend <- &sendSignal{trace} + }) case <-trace.CancelSending: // CancelSending channel is closed, meaning someone else sent the trace. diff --git a/types/event.go b/types/event.go index e3d3284be0..eb4f825f7c 100644 --- a/types/event.go +++ b/types/event.go @@ -2,6 +2,7 @@ package types import ( "context" + "sync" "time" ) @@ -98,6 +99,8 @@ type Trace struct { // Closing this channel will cause any still-waiting send timers to exit. CancelSending chan struct{} + SendOnce sync.Once + // spans is the list of spans in this trace, protected by the list lock spans []*Span } From f4525056e7bc1803f8e72fb57b966678b9bd7ac3 Mon Sep 17 00:00:00 2001 From: Chris Toshok Date: Mon, 20 Apr 2020 10:33:28 -0700 Subject: [PATCH 2/2] nuke sendAfterIdleTimeout --- collect/collect.go | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/collect/collect.go b/collect/collect.go index c7fbbb03b5..bfe3ad87ce 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -362,30 +362,6 @@ func (i *InMemCollector) sendAfterTraceTimeout(trace *types.Trace) { go i.pauseAndSend(dur, trace) } -// if the configuration says "send the trace when no new spans have come in for -// X seconds" this function will cancel all outstanding send timers and start a -// new one. To prevent infinitely postponed traces, there is still the (TODO) -// total number of spans cap and a (TODO) gloabal time since first seen cap. -// -// TODO this is not yet actually implemented, but leaving the function here as a -// reminder that it'd be an interesting config to add. -func (i *InMemCollector) sendAfterIdleTimeout(trace *types.Trace) { - // cancel all outstanding sending timers - close(trace.CancelSending) - - // get the configured delay - spanSeenDelay, err := i.Config.GetSpanSeenDelay() - if err != nil { - i.Logger.Errorf("failed to get send delay. pausing for 2 seconds") - spanSeenDelay = 2 - } - - // make a new cancel sending channel and then wait on it - trace.CancelSending = make(chan struct{}) - dur := time.Duration(spanSeenDelay) * time.Second - go i.pauseAndSend(dur, trace) -} - // sendAfterRootDelay waits the SendDelay timeout then registers the trace to be // sent. func (i *InMemCollector) sendAfterRootDelay(trace *types.Trace) {