Skip to content

Commit

Permalink
Merge pull request #57 from honeycombio/toshok.send-using-sync-once
Browse files Browse the repository at this point in the history
It looks to be (theoretically, I haven't verified by testing it) possible for multiple goroutines to end up in this particular switch arm at the same time.

Use a `sync.Once` to make sure that even if that happens they can't both try to close the channel and/or send the trace.
  • Loading branch information
Chris Toshok authored Apr 20, 2020
2 parents 44700e2 + f452505 commit 9c3578f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 34 deletions.
46 changes: 12 additions & 34 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,30 +362,6 @@ func (i *InMemCollector) sendAfterTraceTimeout(trace *types.Trace) {
go i.pauseAndSend(dur, trace)
}

// if the configuration says "send the trace when no new spans have come in for
// X seconds" this function will cancel all outstanding send timers and start a
// new one. To prevent infinitely postponed traces, there is still the (TODO)
// total number of spans cap and a (TODO) gloabal time since first seen cap.
//
// TODO this is not yet actually implemented, but leaving the function here as a
// reminder that it'd be an interesting config to add.
func (i *InMemCollector) sendAfterIdleTimeout(trace *types.Trace) {
// cancel all outstanding sending timers
close(trace.CancelSending)

// get the configured delay
spanSeenDelay, err := i.Config.GetSpanSeenDelay()
if err != nil {
i.Logger.Errorf("failed to get send delay. pausing for 2 seconds")
spanSeenDelay = 2
}

// make a new cancel sending channel and then wait on it
trace.CancelSending = make(chan struct{})
dur := time.Duration(spanSeenDelay) * time.Second
go i.pauseAndSend(dur, trace)
}

// sendAfterRootDelay waits the SendDelay timeout then registers the trace to be
// sent.
func (i *InMemCollector) sendAfterRootDelay(trace *types.Trace) {
Expand All @@ -402,16 +378,18 @@ func (i *InMemCollector) sendAfterRootDelay(trace *types.Trace) {
func (i *InMemCollector) pauseAndSend(pause time.Duration, trace *types.Trace) {
select {
case <-time.After(pause):
// TODO fix FinishTime to be the time of the last span + its duration rather
// than whenever the timer goes off.
trace.FinishTime = time.Now()
// close the channel so all other timers expire
close(trace.CancelSending)
i.Logger.
WithField("trace_id", trace.TraceID).
WithField("pause_dur", pause).
Debugf("pauseAndSend wait finished; sending trace.")
i.toSend <- &sendSignal{trace}
trace.SendOnce.Do(func() {
// TODO fix FinishTime to be the time of the last span + its duration rather
// than whenever the timer goes off.
trace.FinishTime = time.Now()
// close the channel so all other timers expire
close(trace.CancelSending)
i.Logger.
WithField("trace_id", trace.TraceID).
WithField("pause_dur", pause).
Debugf("pauseAndSend wait finished; sending trace.")
i.toSend <- &sendSignal{trace}
})

case <-trace.CancelSending:
// CancelSending channel is closed, meaning someone else sent the trace.
Expand Down
3 changes: 3 additions & 0 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"context"
"sync"
"time"
)

Expand Down Expand Up @@ -98,6 +99,8 @@ type Trace struct {
// Closing this channel will cause any still-waiting send timers to exit.
CancelSending chan struct{}

SendOnce sync.Once

// spans is the list of spans in this trace, protected by the list lock
spans []*Span
}
Expand Down

0 comments on commit 9c3578f

Please sign in to comment.