diff --git a/probe/main.go b/probe/main.go index 2d92f2cd57..071bc24e69 100644 --- a/probe/main.go +++ b/probe/main.go @@ -176,59 +176,39 @@ func main() { var rpt syncReport rpt.swap(report.MakeReport()) - go func() { - defer done.Done() - spyTick := time.Tick(*spyInterval) - - for { - select { - case <-spyTick: - start := time.Now() - for _, ticker := range tickers { - if err := ticker.Tick(); err != nil { - log.Printf("error doing ticker: %v", err) - } - } + go spyLoop(quit, done, *spyInterval, tickers, reporters, taggers, rpt) + go publishLoop(quit, done, *publishInterval, publishers, rpt) - localReport := rpt.copy() - localReport = localReport.Merge(doReport(reporters)) - localReport = Apply(localReport, taggers) - rpt.swap(localReport) + log.Printf("%s", <-interrupt()) +} - if took := time.Since(start); took > *spyInterval { - log.Printf("report generation took too long (%s)", took) +func spyLoop(quit chan struct{}, done sync.WaitGroup, spyInterval time.Duration, tickers []Ticker, reporters []Reporter, taggers []Tagger, rpt syncReport) { + defer done.Done() + spyTick := time.Tick(spyInterval) + + for { + select { + case <-spyTick: + start := time.Now() + for _, ticker := range tickers { + if err := ticker.Tick(); err != nil { + log.Printf("error doing ticker: %v", err) } - - case <-quit: - return } - } - }() - go func() { - defer done.Done() - var ( - pubTick = time.Tick(*publishInterval) - p = xfer.NewReportPublisher(publishers) - ) - - for { - select { - case <-pubTick: - publishTicks.WithLabelValues().Add(1) - localReport := rpt.swap(report.MakeReport()) - localReport.Window = *publishInterval - if err := p.Publish(localReport); err != nil { - log.Printf("publish: %v", err) - } + localReport := rpt.copy() + localReport = localReport.Merge(doReport(reporters)) + localReport = Apply(localReport, taggers) + rpt.swap(localReport) - case <-quit: - return + if took := time.Since(start); took > *spyInterval { + log.Printf("report generation took too long (%s)", took) } - } - }() - log.Printf("%s", <-interrupt()) + case <-quit: + return + } + } } func doReport(reporters []Reporter) report.Report { @@ -251,6 +231,29 @@ func doReport(reporters []Reporter) report.Report { return result } +func publishLoop(quit chan struct{}, done sync.WaitGroup, publishInterval time.Duration, publishers xfer.Publisher, rpt syncReport) { + defer done.Done() + var ( + pubTick = time.Tick(*publishInterval) + p = xfer.NewReportPublisher(publishers) + ) + + for { + select { + case <-pubTick: + publishTicks.WithLabelValues().Add(1) + localReport := rpt.swap(report.MakeReport()) + localReport.Window = *publishInterval + if err := p.Publish(localReport); err != nil { + log.Printf("publish: %v", err) + } + + case <-quit: + return + } + } +} + func interrupt() <-chan os.Signal { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)