Skip to content

Commit

Permalink
*: Introduce config coordinator bundling config specific logic
Browse files Browse the repository at this point in the history
Instead of handling all config specific logic inside
Alertmangaer.main(), this patch introduces the config coordinator
component.

Tasks of the config coordinator:
- Load and parse configuration
- Notify subscribers on configuration changes
- Register and manage configuration specific metrics

Signed-off-by: Max Leonard Inden <IndenML@gmail.com>
  • Loading branch information
mxinden committed Feb 20, 2019
1 parent f809c45 commit b5f772f
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 88 deletions.
9 changes: 3 additions & 6 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,9 @@ func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {

// Update config and resolve timeout of each API. APIv2 also needs
// setAlertStatus to be updated.
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus func(model.LabelSet) error) error {
if err := api.v1.Update(cfg, resolveTimeout); err != nil {
return err
}

return api.v2.Update(cfg, resolveTimeout, setAlertStatus)
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet) error) {
api.v1.Update(cfg)
api.v2.Update(cfg, setAlertStatus)
}

func (api *API) limitHandler(h http.Handler) http.Handler {
Expand Down
21 changes: 9 additions & 12 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ func setCORS(w http.ResponseWriter) {

// API provides registration of handlers for API routes.
type API struct {
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
resolveTimeout time.Duration
uptime time.Time
peer *cluster.Peer
logger log.Logger
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
uptime time.Time
peer *cluster.Peer
logger log.Logger

numReceivedAlerts *prometheus.CounterVec
numInvalidAlerts prometheus.Counter
Expand Down Expand Up @@ -153,14 +152,12 @@ func (api *API) Register(r *route.Router) {
}

// Update sets the configuration string to a new value.
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error {
func (api *API) Update(cfg *config.Config) {
api.mtx.Lock()
defer api.mtx.Unlock()

api.resolveTimeout = resolveTimeout
api.config = cfg
api.route = dispatch.NewRoute(cfg.Route, nil)
return nil
}

type errorType string
Expand Down Expand Up @@ -432,7 +429,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
now := time.Now()

api.mtx.RLock()
resolveTimeout := api.resolveTimeout
resolveTimeout := time.Duration(api.config.Global.ResolveTimeout)
api.mtx.RUnlock()

for _, alert := range alerts {
Expand Down
6 changes: 6 additions & 0 deletions api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ func TestAddAlerts(t *testing.T) {

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
api.Update(&config.Config{
Global: &defaultGlobalConfig,
Route: &route,
})

r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w := httptest.NewRecorder()
Expand Down
9 changes: 3 additions & 6 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ type API struct {
getAlertStatus getAlertStatusFn
uptime time.Time

// mtx protects resolveTimeout, alertmanagerConfig, setAlertStatus and route.
// mtx protects alertmanagerConfig, setAlertStatus and route.
mtx sync.RWMutex
// resolveTimeout represents the default resolve timeout that an alert is
// assigned if no end time is specified.
resolveTimeout time.Duration
alertmanagerConfig *config.Config
route *dispatch.Route
setAlertStatus setAlertStatusFn
Expand Down Expand Up @@ -123,15 +122,13 @@ func NewAPI(
}

// Update sets the API struct members that may change between reloads of alertmanager.
func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration, setAlertStatus setAlertStatusFn) error {
func (api *API) Update(cfg *config.Config, setAlertStatus setAlertStatusFn) {
api.mtx.Lock()
defer api.mtx.Unlock()

api.resolveTimeout = resolveTimeout
api.alertmanagerConfig = cfg
api.route = dispatch.NewRoute(cfg.Route, nil)
api.setAlertStatus = setAlertStatus
return nil
}

func (api *API) getStatusHandler(params general_ops.GetStatusParams) middleware.Responder {
Expand Down Expand Up @@ -336,7 +333,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
now := time.Now()

api.mtx.RLock()
resolveTimeout := api.resolveTimeout
resolveTimeout := time.Duration(api.alertmanagerConfig.Global.ResolveTimeout)
api.mtx.RUnlock()

for _, alert := range alerts {
Expand Down
83 changes: 19 additions & 64 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package main

import (
"context"
"crypto/md5"
"encoding/binary"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -56,18 +54,6 @@ import (
)

var (
configHash = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_hash",
Help: "Hash of the currently loaded alertmanager configuration.",
})
configSuccess = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
configSuccessTime = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "alertmanager_http_request_duration_seconds",
Expand All @@ -88,9 +74,6 @@ var (
)

func init() {
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)
prometheus.MustRegister(configHash)
prometheus.MustRegister(requestDuration)
prometheus.MustRegister(responseSize)
prometheus.MustRegister(version.NewCollector("alertmanager"))
Expand Down Expand Up @@ -263,14 +246,6 @@ func run() int {
}
defer alerts.Close()

var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
pipeline notify.Stage
disp *dispatch.Dispatcher
)
defer disp.Stop()

api, err := api.New(api.Options{
Alerts: alerts,
Silences: silences,
Expand Down Expand Up @@ -304,30 +279,24 @@ func run() int {
return d + waitFunc()
}

var hash float64
reload := func() (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "file", *configFile)
defer func() {
if err != nil {
level.Error(logger).Log("msg", "Loading configuration file failed", "file", *configFile, "err", err)
configSuccess.Set(0)
} else {
configSuccess.Set(1)
configSuccessTime.Set(float64(time.Now().Unix()))
configHash.Set(hash)
}
}()

conf, plainCfg, err := config.LoadFile(*configFile)
if err != nil {
return err
}
var (
inhibitor *inhibit.Inhibitor
tmpl *template.Template
pipeline notify.Stage
disp *dispatch.Dispatcher
)

hash = md5HashAsMetricValue(plainCfg)
defer disp.Stop()

configCoordinator := config.NewCoordinator(
*configFile,
prometheus.DefaultRegisterer,
log.With(logger, "component", "configuration"),
)
configCoordinator.Subscribe(func(conf *config.Config) {
tmpl, err = template.FromGlobs(conf.Templates...)
if err != nil {
return err
level.Error(logger).Log("err", fmt.Errorf("failed to parse templates: %v", err.Error()))
}
tmpl.ExternalURL = amURL

Expand All @@ -347,20 +316,15 @@ func run() int {
logger,
)

err = api.Update(conf, time.Duration(conf.Global.ResolveTimeout), setAlertStatus(inhibitor, marker, silences))
if err != nil {
return err
}
api.Update(conf, setAlertStatus(inhibitor, marker, silences))

disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)

go disp.Run()
go inhibitor.Run()
})

return nil
}

if err := reload(); err != nil {
if err := configCoordinator.Reload(); err != nil {
return 1
}

Expand Down Expand Up @@ -413,9 +377,9 @@ func run() int {
select {
case <-hup:
// ignore error, already logged in `reload()`
_ = reload()
_ = configCoordinator.Reload()
case errc := <-webReload:
errc <- reload()
errc <- configCoordinator.Reload()
}
}
}()
Expand Down Expand Up @@ -470,15 +434,6 @@ func extURL(listen, external string) (*url.URL, error) {
return u, nil
}

func md5HashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
// We only want 48 bits as a float64 only has a 53 bit mantissa.
smallSum := sum[0:6]
var bytes = make([]byte, 8)
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}

func setAlertStatus(inhibitor *inhibit.Inhibitor, marker types.Marker, silences *silence.Silences) func(model.LabelSet) error {
return func(labels model.LabelSet) error {
inhibitor.Mutes(labels)
Expand Down
Loading

0 comments on commit b5f772f

Please sign in to comment.