Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bosun: Concurrent checks #1231

Merged
merged 1 commit into from
Aug 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions cmd/bosun/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Conf struct {
UnknownThreshold int
Templates map[string]*Template
Alerts map[string]*Alert
OrderedAlerts []*Alert `json:"-"` //alerts in order they appear.
Notifications map[string]*Notification `json:"-"`
RawText string
Macros map[string]*Macro
Expand Down Expand Up @@ -943,7 +942,6 @@ func (c *Conf) loadAlert(s *parse.SectionNode) {
}
a.returnType = ret
c.Alerts[name] = &a
c.OrderedAlerts = append(c.OrderedAlerts, &a)
}

func (c *Conf) loadNotification(s *parse.SectionNode) {
Expand Down Expand Up @@ -1311,9 +1309,7 @@ func (c *Conf) alert(s *expr.State, T miniprofiler.Timer, name, key string) (res
return nil, err
}
if s.History != nil {

unknownTags, unevalTags := s.History.GetUnknownAndUnevaluatedAlertKeys(name)

// For currently unknown tags NOT in the result set, add an error result
for _, ak := range unknownTags {
found := false
Expand All @@ -1332,14 +1328,22 @@ func (c *Conf) alert(s *expr.State, T miniprofiler.Timer, name, key string) (res
}
}
//For all unevaluated tags in run history, make sure we report a nonzero result.
Loop:
for _, result := range results.Results {
for _, ak := range unevalTags {
for _, ak := range unevalTags {
found := false
for _, result := range results.Results {
if result.Group.Equal(ak.Group()) {
result.Value = expr.Number(1)
break Loop
found = true
break
}
}
if !found {
res := expr.Result{
Value: expr.Number(1),
Group: ak.Group(),
}
results.Results = append(results.Results, &res)
}
}
}
return results, nil
Expand Down
34 changes: 17 additions & 17 deletions cmd/bosun/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package main
import (
"flag"
"fmt"
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
Expand All @@ -27,6 +26,7 @@ import (
"bosun.org/graphite"
"bosun.org/metadata"
"bosun.org/opentsdb"
"bosun.org/slog"
"bosun.org/version"
)

Expand Down Expand Up @@ -83,7 +83,7 @@ func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
c, err := conf.ParseFile(*flagConf)
if err != nil {
log.Fatal(err)
slog.Fatal(err)
}
if *flagTest {
os.Exit(0)
Expand All @@ -96,10 +96,10 @@ func main() {
httpListen.Host = "localhost" + httpListen.Host
}
if err := metadata.Init(httpListen, false); err != nil {
log.Fatal(err)
slog.Fatal(err)
}
if err := sched.Load(c); err != nil {
log.Fatal(err)
slog.Fatal(err)
}
if c.RelayListen != "" {
go func() {
Expand All @@ -109,12 +109,12 @@ func main() {
Addr: c.RelayListen,
Handler: mux,
}
log.Fatal(s.ListenAndServe())
slog.Fatal(s.ListenAndServe())
}()
}
if c.TSDBHost != "" {
if err := collect.Init(httpListen, "bosun"); err != nil {
log.Fatal(err)
slog.Fatal(err)
}
tsdbHost := &url.URL{
Scheme: "http",
Expand All @@ -129,18 +129,18 @@ func main() {
}
rp.ServeHTTP(w, r)
}))
log.Println("readonly relay at", ts.URL, "to", tsdbHost)
slog.Infoln("readonly relay at", ts.URL, "to", tsdbHost)
tsdbHost, _ = url.Parse(ts.URL)
c.TSDBHost = tsdbHost.Host
}
}
if *flagQuiet {
c.Quiet = true
}
go func() { log.Fatal(web.Listen(c.HTTPListen, *flagDev, c.TSDBHost)) }()
go func() { slog.Fatal(web.Listen(c.HTTPListen, *flagDev, c.TSDBHost)) }()
go func() {
if !*flagNoChecks {
log.Fatal(sched.Run())
sched.Run()
}
}()
go func() {
Expand All @@ -149,14 +149,14 @@ func main() {
killing := false
for range sc {
if killing {
log.Println("Second interrupt: exiting")
slog.Infoln("Second interrupt: exiting")
os.Exit(1)
}
killing = true
go func() {
log.Println("Interrupt: closing down...")
slog.Infoln("Interrupt: closing down...")
sched.Close()
log.Println("done")
slog.Infoln("done")
os.Exit(1)
}()
}
Expand All @@ -177,21 +177,21 @@ func quit() {
func watch(root, pattern string, f func()) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
slog.Fatal(err)
}
filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if matched, err := filepath.Match(pattern, info.Name()); err != nil {
log.Fatal(err)
slog.Fatal(err)
} else if !matched {
return nil
}
err = watcher.Add(path)
if err != nil {
log.Fatal(err)
slog.Fatal(err)
}
return nil
})
log.Println("watching", pattern, "in", root)
slog.Infoln("watching", pattern, "in", root)
wait := time.Now()
go func() {
for {
Expand All @@ -205,7 +205,7 @@ func watch(root, pattern string, f func()) {
wait = time.Now().Add(time.Second * 2)
}
case err := <-watcher.Errors:
log.Println("error:", err)
slog.Errorln("error:", err)
}
}
}()
Expand Down
57 changes: 57 additions & 0 deletions cmd/bosun/sched/alertRunner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package sched

import (
"fmt"
"time"

"bosun.org/cmd/bosun/cache"
"bosun.org/cmd/bosun/conf"
"bosun.org/slog"
)

// Run should be called once (and only once) to start all schedule activity.
func (s *Schedule) Run() error {
if s.Conf == nil {
return fmt.Errorf("sched: nil configuration")
}
s.nc = make(chan interface{}, 1)
if s.Conf.Ping {
go s.PingHosts()
}
go s.dispatchNotifications()
go s.performSave()
go s.updateCheckContext()
for _, a := range s.Conf.Alerts {
go s.RunAlert(a)
}
return nil
}
func (s *Schedule) updateCheckContext() {
for {
ctx := &checkContext{time.Now(), cache.New(0)}
s.ctx = ctx
time.Sleep(s.Conf.CheckFrequency)
s.Lock("CollectStates")
s.CollectStates()
s.Unlock()
}
}
func (s *Schedule) RunAlert(a *conf.Alert) {
for {
wait := time.After(s.Conf.CheckFrequency * time.Duration(a.RunEvery))
s.checkAlert(a)
s.LastCheck = time.Now()
<-wait
}
}

func (s *Schedule) checkAlert(a *conf.Alert) {
checkTime := s.ctx.runTime
checkCache := s.ctx.checkCache
rh := s.NewRunHistory(checkTime, checkCache)
s.CheckAlert(nil, rh, a)

start := time.Now()
s.RunHistory(rh)
slog.Infof("runHistory on %s took %v\n", a.Name, time.Since(start))
}
47 changes: 23 additions & 24 deletions cmd/bosun/sched/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"os"
"time"

Expand All @@ -18,6 +17,7 @@ import (
"bosun.org/cmd/bosun/expr"
"bosun.org/collect"
"bosun.org/opentsdb"
"bosun.org/slog"
)

func (s *Schedule) performSave() {
Expand Down Expand Up @@ -75,18 +75,18 @@ func (s *Schedule) save() {
cw := &counterWriter{w: gz}
enc := gob.NewEncoder(cw)
if err := enc.Encode(data); err != nil {
log.Printf("error saving %s: %v", name, err)
slog.Errorf("error saving %s: %v", name, err)
s.Unlock()
return
}
if err := gz.Flush(); err != nil {
log.Printf("gzip flush error saving %s: %v", name, err)
slog.Errorf("gzip flush error saving %s: %v", name, err)
}
if err := gz.Close(); err != nil {
log.Printf("gzip close error saving %s: %v", name, err)
slog.Errorf("gzip close error saving %s: %v", name, err)
}
tostore[name] = f.Bytes()
log.Printf("wrote %s: %v", name, conf.ByteSize(cw.written))
slog.Infof("wrote %s: %v", name, conf.ByteSize(cw.written))
collect.Put("statefile.size", opentsdb.TagSet{"object": name}, cw.written)
}
s.Unlock()
Expand All @@ -103,19 +103,19 @@ func (s *Schedule) save() {
return nil
})
if err != nil {
log.Printf("save db update error: %v", err)
slog.Errorf("save db update error: %v", err)
return
}
fi, err := os.Stat(s.Conf.StateFile)
if err == nil {
collect.Put("statefile.size", opentsdb.TagSet{"object": "total"}, fi.Size())
}
log.Println("save to db complete")
slog.Infoln("save to db complete")
}

// RestoreState restores notification and alert state from the file on disk.
func (s *Schedule) RestoreState() error {
log.Println("RestoreState")
slog.Infoln("RestoreState")
start := time.Now()
s.Lock("RestoreState")
defer s.Unlock()
Expand Down Expand Up @@ -143,26 +143,26 @@ func (s *Schedule) RestoreState() error {
return gob.NewDecoder(gr).Decode(dst)
}
if err := decode(dbMetric, &s.Search.Metric); err != nil {
log.Println(dbMetric, err)
slog.Errorln(dbMetric, err)
}
if err := decode(dbTagk, &s.Search.Tagk); err != nil {
log.Println(dbTagk, err)
slog.Errorln(dbTagk, err)
}
if err := decode(dbTagv, &s.Search.Tagv); err != nil {
log.Println(dbTagv, err)
slog.Errorln(dbTagv, err)
}
if err := decode(dbMetricTags, &s.Search.MetricTags); err != nil {
log.Println(dbMetricTags, err)
slog.Errorln(dbMetricTags, err)
}
notifications := make(map[expr.AlertKey]map[string]time.Time)
if err := decode(dbNotifications, &notifications); err != nil {
log.Println(dbNotifications, err)
slog.Errorln(dbNotifications, err)
}
if err := decode(dbSilence, &s.Silence); err != nil {
log.Println(dbSilence, err)
slog.Errorln(dbSilence, err)
}
if err := decode(dbIncidents, &s.Incidents); err != nil {
log.Println(dbIncidents, err)
slog.Errorln(dbIncidents, err)
}
// Calculate next incident id.
for _, i := range s.Incidents {
Expand All @@ -172,7 +172,7 @@ func (s *Schedule) RestoreState() error {
}
status := make(States)
if err := decode(dbStatus, &status); err != nil {
log.Println(dbStatus, err)
slog.Errorln(dbStatus, err)
}
clear := func(r *Result) {
if r == nil {
Expand All @@ -183,10 +183,10 @@ func (s *Schedule) RestoreState() error {
for ak, st := range status {
a, present := s.Conf.Alerts[ak.Name()]
if !present {
log.Println("sched: alert no longer present, ignoring:", ak)
slog.Errorln("sched: alert no longer present, ignoring:", ak)
continue
} else if s.Conf.Squelched(a, st.Group) {
log.Println("sched: alert now squelched:", ak)
slog.Infoln("sched: alert now squelched:", ak)
continue
} else {
t := a.Unknown
Expand All @@ -206,30 +206,29 @@ func (s *Schedule) RestoreState() error {
s.status[ak] = st
if a.Log && st.Open {
st.Open = false
log.Printf("sched: alert %s is now log, closing, was %s", ak, st.Status())
slog.Infof("sched: alert %s is now log, closing, was %s", ak, st.Status())
}
for name, t := range notifications[ak] {
n, present := s.Conf.Notifications[name]
if !present {
log.Println("sched: notification not present during restore:", name)
slog.Infoln("sched: notification not present during restore:", name)
continue
}
if a.Log {
log.Println("sched: alert is now log, removing notification:", ak)
slog.Infoln("sched: alert is now log, removing notification:", ak)
continue
}
s.AddNotification(ak, n, t)
}
}
if err := decode(dbMetadata, &s.Metadata); err != nil {
log.Println(dbMetadata, err)
slog.Errorln(dbMetadata, err)
}
if s.maxIncidentId == 0 {
s.createHistoricIncidents()
}
s.Search.Copy()
s.readStatus = s.status.Copy()
log.Println("RestoreState done in", time.Since(start))
slog.Infoln("RestoreState done in", time.Since(start))
return nil
}

Expand Down
Loading