Skip to content

Commit

Permalink
Minor refactoring of probe main
Browse files Browse the repository at this point in the history
  • Loading branch information
paulbellamy committed Nov 6, 2015
1 parent d520662 commit aaf6db2
Showing 1 changed file with 48 additions and 45 deletions.
93 changes: 48 additions & 45 deletions probe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,59 +176,39 @@ func main() {
var rpt syncReport
rpt.swap(report.MakeReport())

go func() {
defer done.Done()
spyTick := time.Tick(*spyInterval)

for {
select {
case <-spyTick:
start := time.Now()
for _, ticker := range tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}
}
go spyLoop(quit, done, *spyInterval, tickers, reporters, taggers, rpt)
go publishLoop(quit, done, *publishInterval, publishers, rpt)

localReport := rpt.copy()
localReport = localReport.Merge(doReport(reporters))
localReport = Apply(localReport, taggers)
rpt.swap(localReport)
log.Printf("%s", <-interrupt())
}

if took := time.Since(start); took > *spyInterval {
log.Printf("report generation took too long (%s)", took)
func spyLoop(quit chan struct{}, done sync.WaitGroup, spyInterval time.Duration, tickers []Ticker, reporters []Reporter, taggers []Tagger, rpt syncReport) {
defer done.Done()
spyTick := time.Tick(spyInterval)

for {
select {
case <-spyTick:
start := time.Now()
for _, ticker := range tickers {
if err := ticker.Tick(); err != nil {
log.Printf("error doing ticker: %v", err)
}

case <-quit:
return
}
}
}()

go func() {
defer done.Done()
var (
pubTick = time.Tick(*publishInterval)
p = xfer.NewReportPublisher(publishers)
)

for {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
localReport := rpt.swap(report.MakeReport())
localReport.Window = *publishInterval
if err := p.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}
localReport := rpt.copy()
localReport = localReport.Merge(doReport(reporters))
localReport = Apply(localReport, taggers)
rpt.swap(localReport)

case <-quit:
return
if took := time.Since(start); took > *spyInterval {
log.Printf("report generation took too long (%s)", took)
}
}
}()

log.Printf("%s", <-interrupt())
case <-quit:
return
}
}
}

func doReport(reporters []Reporter) report.Report {
Expand All @@ -251,6 +231,29 @@ func doReport(reporters []Reporter) report.Report {
return result
}

func publishLoop(quit chan struct{}, done sync.WaitGroup, publishInterval time.Duration, publishers xfer.Publisher, rpt syncReport) {
defer done.Done()
var (
pubTick = time.Tick(*publishInterval)
p = xfer.NewReportPublisher(publishers)
)

for {
select {
case <-pubTick:
publishTicks.WithLabelValues().Add(1)
localReport := rpt.swap(report.MakeReport())
localReport.Window = *publishInterval
if err := p.Publish(localReport); err != nil {
log.Printf("publish: %v", err)
}

case <-quit:
return
}
}
}

func interrupt() <-chan os.Signal {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
Expand Down

0 comments on commit aaf6db2

Please sign in to comment.