Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce concurrency limit for GET requests and a general timeout for HTTP #1743

Merged
merged 4 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 147 additions & 22 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
package api

import (
"errors"
"fmt"
"net/http"
"runtime"
"time"

apiv1 "github.com/prometheus/alertmanager/api/v1"
Expand All @@ -24,6 +27,7 @@ import (
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"

Expand All @@ -32,56 +36,149 @@ import (

// API represents all APIs of Alertmanager.
type API struct {
v1 *apiv1.API
v2 *apiv2.API
v1 *apiv1.API
v2 *apiv2.API
requestsInFlight prometheus.Gauge
concurrencyLimitExceeded prometheus.Counter
timeout time.Duration
inFlightSem chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used in the handler, right? How about initializing it in the handler constructor closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we call limitHandler twice, once to handle the router handler, once to handle the apiv2 handler. If we moved the inFlightSem into limitHandler, we would limit the GET requests separately for each one. But we want one limit for all of the HTTP handling. (That was the reason why I had a function that returns a function (the middleware) that returns a function (the HandlerFunc) in the previous version. I could avoid that confusion by having inFlightSem as a member of API and limitHandler a method of API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxinden did my comment clarify the issue? Or do you think this should be changed in another way?

}

// Options for the creation of an API object. Alerts, Silences, and StatusFunc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding detailed comments here.

// are mandatory to set. The zero value for everything else is a safe default.
type Options struct {
// Alerts to be used by the API. Mandatory.
Alerts provider.Alerts
// Silences to be used by the API. Mandatory.
Silences *silence.Silences
// StatusFunc is used be the API to retrieve the AlertStatus of an
// alert. Mandatory.
StatusFunc func(model.Fingerprint) types.AlertStatus
// Peer from the gossip cluster. If nil, no clustering will be used.
Peer *cluster.Peer
// Timeout for all HTTP connections. The zero value (and negative
// values) result in no timeout.
Timeout time.Duration
// Concurrency limit for GET requests. The zero value (and negative
// values) result in a limit of GOMAXPROCS or 8, whichever is
// larger. Status code 503 is served for GET requests that would exceed
// the concurrency limit.
Concurrency int
// Logger is used for logging, if nil, no logging will happen.
Logger log.Logger
// Registry is used to register Prometheus metrics. If nil, no metrics
// registration will happen.
Registry prometheus.Registerer
}

func (o Options) validate() error {
if o.Alerts == nil {
return errors.New("mandatory field Alerts not set")
}
if o.Silences == nil {
return errors.New("mandatory field Silences not set")
}
if o.StatusFunc == nil {
return errors.New("mandatory field StatusFunc not set")
}
return nil
}

// New creates a new API object combining all API versions.
func New(
alerts provider.Alerts,
silences *silence.Silences,
sf func(model.Fingerprint) types.AlertStatus,
peer *cluster.Peer,
l log.Logger,
) (*API, error) {
func New(opts Options) (*API, error) {
if err := opts.validate(); err != nil {
return nil, fmt.Errorf("invalid API options: %s", err)
}
l := opts.Logger
if l == nil {
l = log.NewNopLogger()
}
concurrency := opts.Concurrency
if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0)
if concurrency < 8 {
concurrency = 8
}
}

v1 := apiv1.New(
alerts,
silences,
sf,
peer,
opts.Alerts,
opts.Silences,
opts.StatusFunc,
opts.Peer,
log.With(l, "version", "v1"),
opts.Registry,
)

v2, err := apiv2.NewAPI(
alerts,
sf,
silences,
peer,
opts.Alerts,
opts.StatusFunc,
opts.Silences,
opts.Peer,
log.With(l, "version", "v2"),
)

if err != nil {
return nil, err
}

// TODO(beorn7): For now, this hardcodes the method="get" label. Other
// methods should get the same instrumentation.
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_http_requests_in_flight",
Help: "Current number of HTTP requests being processed.",
ConstLabels: prometheus.Labels{"method": "get"},
})
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_http_concurrency_limit_exceeded_total",
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
ConstLabels: prometheus.Labels{"method": "get"},
})
if opts.Registry != nil {
if err := opts.Registry.Register(requestsInFlight); err != nil {
return nil, err
}
if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
return nil, err
}
}

return &API{
v1: v1,
v2: v2,
v1: v1,
v2: v2,
requestsInFlight: requestsInFlight,
concurrencyLimitExceeded: concurrencyLimitExceeded,
timeout: opts.Timeout,
inFlightSem: make(chan struct{}, concurrency),
}, nil
}

// Register all APIs with the given router and return a mux.
// Register all APIs. It registers APIv1 with the provided router directly. As
// APIv2 works on the http.Handler level, this method also creates a new
// http.ServeMux and then uses it to register both the provided router (to
// handle "/") and APIv2 (to handle "<routePrefix>/api/v2"). The method returns
// the newly created http.ServeMux. If a timeout has been set on construction of
// API, it is enforced for all HTTP request going through this mux. The same is
// true for the concurrency limit, with the exception that it is only applied to
// GET requests.
func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
api.v1.Register(r.WithPrefix("/api/v1"))

mux := http.NewServeMux()
mux.Handle("/", r)
mux.Handle("/", api.limitHandler(r))

apiPrefix := ""
if routePrefix != "/" {
apiPrefix = routePrefix
}
mux.Handle(apiPrefix+"/api/v2/", http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler))
// TODO(beorn7): HTTP instrumentation is only in place for Router. Since
// /api/v2 works on the Handler level, it is currently not instrumented
// at all (with the exception of requestsInFlight, which is handled in
// limitHandler below).
mux.Handle(
apiPrefix+"/api/v2/",
api.limitHandler(http.StripPrefix(apiPrefix+"/api/v2", api.v2.Handler)),
)

return mux
}
Expand All @@ -94,3 +191,31 @@ func (api *API) Update(cfg *config.Config, resolveTimeout time.Duration) error {

return api.v2.Update(cfg, resolveTimeout)
}

func (api *API) limitHandler(h http.Handler) http.Handler {
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
select {
case api.inFlightSem <- struct{}{}: // All good, carry on.
api.requestsInFlight.Inc()
defer func() {
<-api.inFlightSem
api.requestsInFlight.Dec()
}()
default:
api.concurrencyLimitExceeded.Inc()
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
), http.StatusServiceUnavailable)
return
}
}
h.ServeHTTP(rsp, req)
})
if api.timeout <= 0 {
return concLimiter
}
return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n", api.timeout,
))
}
68 changes: 36 additions & 32 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,6 @@ import (
"github.com/prometheus/alertmanager/types"
)

var (
numReceivedAlerts = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_received_total",
Help: "The total number of received alerts.",
}, []string{"status"})

numInvalidAlerts = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_invalid_total",
Help: "The total number of received alerts that were invalid.",
})
)

func init() {
numReceivedAlerts.WithLabelValues("firing")
numReceivedAlerts.WithLabelValues("resolved")

prometheus.MustRegister(numReceivedAlerts)
prometheus.MustRegister(numInvalidAlerts)
}

var corsHeaders = map[string]string{
"Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin",
"Access-Control-Allow-Methods": "GET, DELETE, OPTIONS",
Expand Down Expand Up @@ -98,6 +76,9 @@ type API struct {
peer *cluster.Peer
logger log.Logger

numReceivedAlerts *prometheus.CounterVec
numInvalidAlerts prometheus.Counter

getAlertStatus getAlertStatusFn

mtx sync.RWMutex
Expand All @@ -112,18 +93,38 @@ func New(
sf getAlertStatusFn,
peer *cluster.Peer,
l log.Logger,
r prometheus.Registerer,
) *API {
if l == nil {
l = log.NewNopLogger()
}

numReceivedAlerts := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_received_total",
Help: "The total number of received alerts.",
}, []string{"status"})

numInvalidAlerts := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "alerts_invalid_total",
Help: "The total number of received alerts that were invalid.",
})
numReceivedAlerts.WithLabelValues("firing")
numReceivedAlerts.WithLabelValues("resolved")
if r != nil {
r.MustRegister(numReceivedAlerts, numInvalidAlerts)
}

return &API{
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
numReceivedAlerts: numReceivedAlerts,
numInvalidAlerts: numInvalidAlerts,
}
}

Expand Down Expand Up @@ -253,6 +254,7 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
// are no alerts present
res = []*Alert{}
matchers = []*labels.Matcher{}
ctx = r.Context()

showActive, showInhibited bool
showSilenced, showUnprocessed bool
Expand Down Expand Up @@ -326,11 +328,13 @@ func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) {
defer alerts.Close()

api.mtx.RLock()
// TODO(fabxc): enforce a sensible timeout.
for a := range alerts.Next() {
if err = alerts.Err(); err != nil {
break
}
if err = ctx.Err(); err != nil {
break
}

routes := api.route.Match(a.Labels)
receivers := make([]string, 0, len(routes))
Expand Down Expand Up @@ -449,9 +453,9 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
alert.EndsAt = now.Add(resolveTimeout)
}
if alert.EndsAt.After(time.Now()) {
numReceivedAlerts.WithLabelValues("firing").Inc()
api.numReceivedAlerts.WithLabelValues("firing").Inc()
} else {
numReceivedAlerts.WithLabelValues("resolved").Inc()
api.numReceivedAlerts.WithLabelValues("resolved").Inc()
}
}

Expand All @@ -465,7 +469,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*

if err := a.Validate(); err != nil {
validationErrs.Add(err)
numInvalidAlerts.Inc()
api.numInvalidAlerts.Inc()
continue
}
validAlerts = append(validAlerts, a)
Expand Down
4 changes: 2 additions & 2 deletions api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestAddAlerts(t *testing.T) {
}

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)

r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w := httptest.NewRecorder()
Expand Down Expand Up @@ -259,7 +259,7 @@ func TestListAlerts(t *testing.T) {
},
} {
alertsProvider := newFakeAlerts(alerts, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)

r, err := http.NewRequest("GET", "/api/v1/alerts", nil)
Expand Down
5 changes: 4 additions & 1 deletion api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
// are no alerts present
res = open_api_models.GettableAlerts{}
matchers = []*labels.Matcher{}
ctx = params.HTTPRequest.Context()
)

if params.Filter != nil {
Expand Down Expand Up @@ -224,11 +225,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
defer alerts.Close()

api.mtx.RLock()
// TODO(fabxc): enforce a sensible timeout.
for a := range alerts.Next() {
if err = alerts.Err(); err != nil {
break
}
if err = ctx.Err(); err != nil {
break
}

routes := api.route.Match(a.Labels)
receivers := make([]*open_api_models.Receiver, 0, len(routes))
Expand Down
Loading