From 3959bdc1a17bff5bbf7e89820fc2139b19581480 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 8 Sep 2017 17:23:04 -0400 Subject: [PATCH] UPSTREAM: 48583: Record 429 and timeout errors to prometheus UPSTREAM: 49117: Add apiserver metric for response sizes split by namespace scope UPSTREAM: 49678: Report non-resource URLs in max-in-flight correctly UPSTREAM: 49678: Timeout filter returns 504 and an inconsistent error body UPSTREAM: 50123: Report pod logs as CONNECT in metrics UPSTREAM: 52237: Add scope to all apiserver metrics and report resource scope --- .../pkg/registry/core/pod/rest/log.go | 11 +++ .../apiserver/pkg/endpoints/handlers/proxy.go | 23 +++-- .../apiserver/pkg/endpoints/installer.go | 69 ++++++++++----- .../pkg/endpoints/metrics/metrics.go | 85 ++++++++++++++----- .../pkg/endpoints/request/requestinfo.go | 15 ++-- .../k8s.io/apiserver/pkg/server/filters/BUILD | 1 + .../pkg/server/filters/maxinflight.go | 18 +++- .../pkg/server/filters/maxinflight_test.go | 5 +- .../apiserver/pkg/server/filters/timeout.go | 40 ++++++--- .../pkg/server/filters/timeout_test.go | 29 ++++++- 10 files changed, 227 insertions(+), 69 deletions(-) diff --git a/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/log.go b/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/log.go index a7877975cfd7..cb637257a442 100644 --- a/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/log.go +++ b/vendor/k8s.io/kubernetes/pkg/registry/core/pod/rest/log.go @@ -87,3 +87,14 @@ func (r *LogREST) Get(ctx genericapirequest.Context, name string, opts runtime.O func (r *LogREST) NewGetOptions() (runtime.Object, bool, string) { return &api.PodLogOptions{}, false, "" } + +// OverrideMetricsVerb override the GET verb to CONNECT for pod log resource +func (r *LogREST) OverrideMetricsVerb(oldVerb string) (newVerb string) { + newVerb = oldVerb + + if oldVerb == "GET" { + newVerb = "CONNECT" + } + + return +} diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go index d856440a836e..435a746b446f 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/proxy.go @@ -53,17 +53,23 @@ type ProxyHandler struct { } func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + reqStart := time.Now() proxyHandlerTraceID := rand.Int63() - var verb string - var apiResource, subresource string + var verb, apiResource, subresource, scope string var httpCode int - reqStart := time.Now() + defer func() { - metrics.Monitor(&verb, &apiResource, &subresource, + responseLength := 0 + if rw, ok := w.(*metrics.ResponseWriterDelegator); ok { + responseLength = rw.ContentLength() + } + metrics.Monitor( + verb, apiResource, subresource, scope, net.GetHTTPClient(req), w.Header().Get("Content-Type"), - httpCode, reqStart) + httpCode, responseLength, reqStart, + ) }() ctx, ok := r.Mapper.Get(req) @@ -86,6 +92,13 @@ func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } verb = requestInfo.Verb namespace, resource, subresource, parts := requestInfo.Namespace, requestInfo.Resource, requestInfo.Subresource, requestInfo.Parts + scope = "cluster" + if namespace != "" { + scope = "namespace" + } + if requestInfo.Name != "" { + scope = "resource" + } ctx = request.WithNamespace(ctx, namespace) if len(parts) < 2 { diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index 7a412a3e27d6..9651c6f65ad5 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -62,6 +62,12 @@ type action struct { AllNamespaces bool // true iff the action is namespaced but works on aggregate result for all namespaces } +// An interface to see if one storage supports override its default verb for monitoring +type StorageMetricsOverride interface { + // OverrideMetricsVerb gives a storage object an opportunity to override the verb reported to the metrics endpoint + OverrideMetricsVerb(oldVerb string) (newVerb string) +} + // An interface to see if an object supports swagger documentation as a method type documentable interface { SwaggerDoc() map[string]string @@ -553,15 +559,22 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versionedObject = defaultVersionedObject } reqScope.Namer = action.Namer - namespaced := "" + + requestScope := "cluster" + var namespaced string + var operationSuffix string if apiResource.Namespaced { + requestScope = "namespace" namespaced = "Namespaced" } - operationSuffix := "" + if len(subresource) > 0 || strings.Contains(action.Path, "/{name}") { + requestScope = "resource" + } if strings.HasSuffix(action.Path, "/{path:*}") { operationSuffix = operationSuffix + "WithPath" } if action.AllNamespaces { + requestScope = "cluster" operationSuffix = operationSuffix + "ForAllNamespaces" namespaced = "" } @@ -576,6 +589,8 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag routes := []*restful.RouteBuilder{} + verbOverrider, needOverride := storage.(StorageMetricsOverride) + switch action.Verb { case "GET": // Get a resource. var handler restful.RouteFunction @@ -584,7 +599,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } else { handler = restfulGetResource(getter, exporter, reqScope) } - handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler) + + if needOverride { + // need change the reported verb + handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler) + } else { + handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler) + } + doc := "read the specified " + kind if hasSubresource { doc = "read " + subresource + " of the specified " + kind @@ -613,7 +635,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "list " + subresource + " of objects of kind " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -645,7 +667,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "replace " + subresource + " of the specified " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulUpdateResource(updater, reqScope, a.group.Typer, admit)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulUpdateResource(updater, reqScope, a.group.Typer, admit)) route := ws.PUT(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -661,7 +683,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "partially update " + subresource + " of the specified " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulPatchResource(patcher, reqScope, admit, mapping.ObjectConvertor)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulPatchResource(patcher, reqScope, admit, mapping.ObjectConvertor)) route := ws.PATCH(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -680,7 +702,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } else { handler = restfulCreateResource(creater, reqScope, a.group.Typer, admit) } - handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler) + handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler) article := getArticleForNoun(kind, " ") doc := "create" + article + kind if hasSubresource { @@ -702,7 +724,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "delete " + subresource + " of" + article + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulDeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)) route := ws.DELETE(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -723,7 +745,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "delete collection of " + subresource + " of a " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulDeleteCollection(collectionDeleter, isCollectionDeleter, reqScope, admit)) route := ws.DELETE(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -742,7 +764,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "watch changes to " + subresource + " of an object of kind " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -761,7 +783,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if hasSubresource { doc = "watch individual changes to a list of " + subresource + " of " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, true, a.minRequestTimeout)) route := ws.GET(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). @@ -779,20 +801,20 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // TODO: DEPRECATED in v1.2. case "PROXY": // Proxy requests to a resource. // Accept all methods as per http://issue.k8s.io/3996 - routes = append(routes, buildProxyRoute(ws, "GET", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "PUT", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "POST", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "PATCH", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "DELETE", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "HEAD", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) - routes = append(routes, buildProxyRoute(ws, "OPTIONS", a.prefix, action.Path, proxyHandler, namespaced, kind, resource, subresource, hasSubresource, action.Params, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "GET", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "PUT", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "POST", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "PATCH", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "DELETE", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "HEAD", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) + routes = append(routes, buildProxyRoute(ws, "OPTIONS", a.prefix, action.Path, kind, resource, subresource, namespaced, requestScope, hasSubresource, action.Params, proxyHandler, operationSuffix)) case "CONNECT": for _, method := range connecter.ConnectMethods() { doc := "connect " + method + " requests to " + kind if hasSubresource { doc = "connect " + method + " requests to " + subresource + " of " + kind } - handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulConnectResource(connecter, reqScope, admit, path, hasSubresource)) + handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulConnectResource(connecter, reqScope, admit, path, hasSubresource)) route := ws.Method(method).Path(action.Path). To(handler). Doc(doc). @@ -853,12 +875,17 @@ func routeFunction(handler http.Handler) restful.RouteFunction { } } -func buildProxyRoute(ws *restful.WebService, method string, prefix string, path string, proxyHandler http.Handler, namespaced, kind, resource, subresource string, hasSubresource bool, params []*restful.Parameter, operationSuffix string) *restful.RouteBuilder { +func buildProxyRoute(ws *restful.WebService, + method, prefix, path, kind, resource, subresource, namespaced, requestScope string, + hasSubresource bool, + params []*restful.Parameter, + proxyHandler http.Handler, + operationSuffix string) *restful.RouteBuilder { doc := "proxy " + method + " requests to " + kind if hasSubresource { doc = "proxy " + method + " requests to " + subresource + " of " + kind } - handler := metrics.InstrumentRouteFunc("PROXY", resource, subresource, routeFunction(proxyHandler)) + handler := metrics.InstrumentRouteFunc("PROXY", resource, subresource, requestScope, routeFunction(proxyHandler)) proxyRoute := ws.Method(method).Path(path).To(handler). Doc(doc). Operation("proxy" + strings.Title(method) + namespaced + kind + strings.Title(subresource) + operationSuffix). diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index 2b5a3e0fd4ca..067131411dec 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "bufio" + //"fmt" "net" "net/http" "regexp" @@ -26,6 +27,7 @@ import ( "time" utilnet "k8s.io/apimachinery/pkg/util/net" + //utilruntime "k8s.io/apimachinery/pkg/util/runtime" "github.com/emicklei/go-restful" "github.com/prometheus/client_golang/prometheus" @@ -39,25 +41,34 @@ var ( Name: "apiserver_request_count", Help: "Counter of apiserver requests broken out for each verb, API resource, client, and HTTP response contentType and code.", }, - []string{"verb", "resource", "subresource", "client", "contentType", "code"}, + []string{"verb", "resource", "subresource", "scope", "client", "contentType", "code"}, ) requestLatencies = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "apiserver_request_latencies", - Help: "Response latency distribution in microseconds for each verb, resource and client.", + Help: "Response latency distribution in microseconds for each verb, resource and subresource.", // Use buckets ranging from 125 ms to 8 seconds. Buckets: prometheus.ExponentialBuckets(125000, 2.0, 7), }, - []string{"verb", "resource", "subresource"}, + []string{"verb", "resource", "subresource", "scope"}, ) requestLatenciesSummary = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "apiserver_request_latencies_summary", - Help: "Response latency summary in microseconds for each verb and resource.", + Help: "Response latency summary in microseconds for each verb, resource and subresource.", // Make the sliding window of 1h. MaxAge: time.Hour, }, - []string{"verb", "resource", "subresource"}, + []string{"verb", "resource", "subresource", "scope"}, + ) + responseSizes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "apiserver_response_sizes", + Help: "Response size distribution in bytes for each verb, resource, subresource and scope (namespace/cluster).", + // Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB). + Buckets: prometheus.ExponentialBuckets(1000, 10.0, 7), + }, + []string{"verb", "resource", "subresource", "scope"}, ) kubectlExeRegexp = regexp.MustCompile(`^.*((?i:kubectl\.exe))`) ) @@ -67,28 +78,57 @@ func Register() { prometheus.MustRegister(requestCounter) prometheus.MustRegister(requestLatencies) prometheus.MustRegister(requestLatenciesSummary) + prometheus.MustRegister(responseSizes) } -func Monitor(verb, resource, subresource *string, client, contentType string, httpCode int, reqStart time.Time) { +// Monitor records a request to the apiserver endpoints that follow the Kubernetes API conventions. verb must be +// uppercase to be backwards compatible with existing monitoring tooling. +func Monitor(verb, resource, subresource, scope, client, contentType string, httpCode, respSize int, reqStart time.Time) { elapsed := float64((time.Since(reqStart)) / time.Microsecond) - requestCounter.WithLabelValues(*verb, *resource, *subresource, client, contentType, codeToString(httpCode)).Inc() - requestLatencies.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed) - requestLatenciesSummary.WithLabelValues(*verb, *resource, *subresource).Observe(elapsed) + requestCounter.WithLabelValues(verb, resource, subresource, scope, client, contentType, codeToString(httpCode)).Inc() + requestLatencies.WithLabelValues(verb, resource, subresource, scope).Observe(elapsed) + requestLatenciesSummary.WithLabelValues(verb, resource, subresource, scope).Observe(elapsed) + // We are only interested in response sizes of read requests. + if verb == "GET" || verb == "LIST" { + responseSizes.WithLabelValues(verb, resource, subresource, scope).Observe(float64(respSize)) + } +} + +// MonitorRequest handles standard transformations for client and the reported verb and then invokes Monitor to record +// a request. verb must be uppercase to be backwards compatible with existing monitoring tooling. +func MonitorRequest(request *http.Request, verb, resource, subresource, scope, contentType string, httpCode, respSize int, reqStart time.Time) { + reportedVerb := verb + if verb == "LIST" { + // see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool + if values := request.URL.Query()["watch"]; len(values) > 0 { + if value := strings.ToLower(values[0]); value != "0" && value != "false" { + reportedVerb = "WATCH" + } + } + } + // normalize this for metrics gathering to avoid accidental elision of relevant metrics + if verb == "WATCHLIST" { + verb = "WATCH" + } + + client := cleanUserAgent(utilnet.GetHTTPClient(request)) + Monitor(reportedVerb, resource, subresource, scope, client, contentType, httpCode, respSize, reqStart) } func Reset() { requestCounter.Reset() requestLatencies.Reset() requestLatenciesSummary.Reset() + responseSizes.Reset() } // InstrumentRouteFunc works like Prometheus' InstrumentHandlerFunc but wraps // the go-restful RouteFunction instead of a HandlerFunc -func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.RouteFunction) restful.RouteFunction { +func InstrumentRouteFunc(verb, resource, subresource, scope string, routeFunc restful.RouteFunction) restful.RouteFunction { return restful.RouteFunction(func(request *restful.Request, response *restful.Response) { now := time.Now() - delegate := &responseWriterDelegator{ResponseWriter: response.ResponseWriter} + delegate := &ResponseWriterDelegator{ResponseWriter: response.ResponseWriter} _, cn := response.ResponseWriter.(http.CloseNotifier) _, fl := response.ResponseWriter.(http.Flusher) @@ -103,11 +143,7 @@ func InstrumentRouteFunc(verb, resource, subresource string, routeFunc restful.R routeFunc(request, response) - reportedVerb := verb - if verb == "LIST" && strings.ToLower(request.QueryParameter("watch")) == "true" { - reportedVerb = "WATCH" - } - Monitor(&reportedVerb, &resource, &subresource, cleanUserAgent(utilnet.GetHTTPClient(request.Request)), rw.Header().Get("Content-Type"), delegate.status, now) + MonitorRequest(request.Request, verb, resource, subresource, scope, delegate.Header().Get("Content-Type"), delegate.Status(), delegate.ContentLength(), now) }) } @@ -121,7 +157,8 @@ func cleanUserAgent(ua string) string { return ua } -type responseWriterDelegator struct { +// ResponseWriterDelegator interface wraps http.ResponseWriter to additionally record content-length, status-code, etc. +type ResponseWriterDelegator struct { http.ResponseWriter status int @@ -129,13 +166,13 @@ type responseWriterDelegator struct { wroteHeader bool } -func (r *responseWriterDelegator) WriteHeader(code int) { +func (r *ResponseWriterDelegator) WriteHeader(code int) { r.status = code r.wroteHeader = true r.ResponseWriter.WriteHeader(code) } -func (r *responseWriterDelegator) Write(b []byte) (int, error) { +func (r *ResponseWriterDelegator) Write(b []byte) (int, error) { if !r.wroteHeader { r.WriteHeader(http.StatusOK) } @@ -144,8 +181,16 @@ func (r *responseWriterDelegator) Write(b []byte) (int, error) { return n, err } +func (r *ResponseWriterDelegator) Status() int { + return r.status +} + +func (r *ResponseWriterDelegator) ContentLength() int { + return int(r.written) +} + type fancyResponseWriterDelegator struct { - *responseWriterDelegator + *ResponseWriterDelegator } func (f *fancyResponseWriterDelegator) CloseNotify() <-chan bool { diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go index 989517c91782..07696d3d3e0d 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints/request/requestinfo.go @@ -201,12 +201,17 @@ func (r *RequestInfoFactory) NewRequestInfo(req *http.Request) (*RequestInfo, er // if there's no name on the request and we thought it was a get before, then the actual verb is a list or a watch if len(requestInfo.Name) == 0 && requestInfo.Verb == "get" { // Assumes v1.ListOptions - // Duplicates logic of Convert_Slice_string_To_bool - switch strings.ToLower(req.URL.Query().Get("watch")) { - case "false", "0", "": + // Any query value that is not 0 or false is considered true + // see apimachinery/pkg/runtime/conversion.go Convert_Slice_string_To_bool + if values := req.URL.Query()["watch"]; len(values) > 0 { + switch strings.ToLower(values[0]) { + case "false", "0": + requestInfo.Verb = "list" + default: + requestInfo.Verb = "watch" + } + } else { requestInfo.Verb = "list" - default: - requestInfo.Verb = "watch" } } // if there's no name on the request and we thought it was a delete before, then the actual verb is deletecollection diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index 77aa7d00763a..612678c328fc 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -45,6 +45,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library", ], diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index b5192bc590fc..a0208ed034df 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -19,10 +19,12 @@ package filters import ( "fmt" "net/http" + "strings" + "time" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -106,6 +108,18 @@ func WithMaxInFlightLimit( } } } + scope := "cluster" + if requestInfo.Namespace != "" { + scope = "namespace" + } + if requestInfo.Name != "" { + scope = "resource" + } + if requestInfo.IsResourceRequest { + metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, http.StatusTooManyRequests, 0, time.Now()) + } else { + metrics.MonitorRequest(r, strings.ToUpper(requestInfo.Verb), "", requestInfo.Path, "", scope, http.StatusTooManyRequests, 0, time.Now()) + } tooManyRequests(r, w) } } @@ -115,5 +129,5 @@ func WithMaxInFlightLimit( func tooManyRequests(req *http.Request, w http.ResponseWriter) { // Return a 429 status indicating "Too Many Requests" w.Header().Set("Retry-After", retryAfter) - http.Error(w, "Too many requests, please try again later.", errors.StatusTooManyRequests) + http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests) } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go index 79e185e818c7..fc302b22b4f7 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight_test.go @@ -24,7 +24,6 @@ import ( "sync" "testing" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" apifilters "k8s.io/apiserver/pkg/endpoints/filters" @@ -148,7 +147,7 @@ func TestMaxInFlightNonMutating(t *testing.T) { // Do this multiple times to show that rate limit rejected requests don't block. for i := 0; i < 2; i++ { - if err := expectHTTPGet(server.URL, errors.StatusTooManyRequests); err != nil { + if err := expectHTTPGet(server.URL, http.StatusTooManyRequests); err != nil { t.Error(err) } } @@ -213,7 +212,7 @@ func TestMaxInFlightMutating(t *testing.T) { // Do this multiple times to show that rate limit rejected requests don't block. for i := 0; i < 2; i++ { - if err := expectHTTPPost(server.URL+"/foo/bar/", errors.StatusTooManyRequests); err != nil { + if err := expectHTTPPost(server.URL+"/foo/bar/", http.StatusTooManyRequests); err != nil { t.Error(err) } } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go index 19894b470bd3..6bc496bdbf22 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go @@ -22,11 +22,12 @@ import ( "fmt" "net" "net/http" + "strings" "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/metrics" apirequest "k8s.io/apiserver/pkg/endpoints/request" ) @@ -39,24 +40,39 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa if longRunning == nil { return handler } - timeoutFunc := func(req *http.Request) (<-chan time.Time, *apierrors.StatusError) { + timeoutFunc := func(req *http.Request) (<-chan time.Time, func(), *apierrors.StatusError) { // TODO unify this with apiserver.MaxInFlightLimit ctx, ok := requestContextMapper.Get(req) if !ok { // if this happens, the handler chain isn't setup correctly because there is no context mapper - return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout")) + return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no context found for request during timeout")) } requestInfo, ok := apirequest.RequestInfoFrom(ctx) if !ok { // if this happens, the handler chain isn't setup correctly because there is no request info - return time.After(globalTimeout), apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout")) + return time.After(globalTimeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout")) } if longRunning(req, requestInfo) { - return nil, nil + return nil, nil, nil } - return time.After(globalTimeout), apierrors.NewServerTimeout(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb, 0) + now := time.Now() + metricFn := func() { + scope := "cluster" + if requestInfo.Namespace != "" { + scope = "namespace" + } + if requestInfo.Name != "" { + scope = "resource" + } + if requestInfo.IsResourceRequest { + metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), requestInfo.Resource, requestInfo.Subresource, "", scope, http.StatusGatewayTimeout, 0, now) + } else { + metrics.MonitorRequest(req, strings.ToUpper(requestInfo.Verb), "", requestInfo.Path, "", scope, http.StatusGatewayTimeout, 0, now) + } + } + return time.After(globalTimeout), metricFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", globalTimeout), 0) } return WithTimeout(handler, timeoutFunc) } @@ -64,22 +80,23 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, requestContextMa // WithTimeout returns an http.Handler that runs h with a timeout // determined by timeoutFunc. The new http.Handler calls h.ServeHTTP to handle // each request, but if a call runs for longer than its time limit, the -// handler responds with a 503 Service Unavailable error and the message +// handler responds with a 504 Gateway Timeout error and the message // provided. (If msg is empty, a suitable default message will be sent.) After // the handler times out, writes by h to its http.ResponseWriter will return // http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no -// timeout will be enforced. -func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, err *apierrors.StatusError)) http.Handler { +// timeout will be enforced. recordFn is a function that will be invoked whenever +// a timeout happens. +func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler { return &timeoutHandler{h, timeoutFunc} } type timeoutHandler struct { handler http.Handler - timeout func(*http.Request) (<-chan time.Time, *apierrors.StatusError) + timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) } func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - after, err := t.timeout(r) + after, recordFn, err := t.timeout(r) if after == nil { t.handler.ServeHTTP(w, r) return @@ -95,6 +112,7 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case <-done: return case <-after: + recordFn() tw.timeout(err) } } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go index 991427231a82..6283a6cc2111 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "reflect" + "sync" "testing" "time" @@ -31,12 +32,30 @@ import ( "k8s.io/apimachinery/pkg/util/diff" ) +type recorder struct { + lock sync.Mutex + count int +} + +func (r *recorder) Record() { + r.lock.Lock() + defer r.lock.Unlock() + r.count++ +} + +func (r *recorder) Count() int { + r.lock.Lock() + defer r.lock.Unlock() + return r.count +} + func TestTimeout(t *testing.T) { sendResponse := make(chan struct{}, 1) writeErrors := make(chan error, 1) timeout := make(chan time.Time, 1) resp := "test response" timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0) + record := &recorder{} ts := httptest.NewServer(WithTimeout(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { @@ -44,8 +63,8 @@ func TestTimeout(t *testing.T) { _, err := w.Write([]byte(resp)) writeErrors <- err }), - func(*http.Request) (<-chan time.Time, *apierrors.StatusError) { - return timeout, timeoutErr + func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError) { + return timeout, record.Record, timeoutErr })) defer ts.Close() @@ -65,6 +84,9 @@ func TestTimeout(t *testing.T) { if err := <-writeErrors; err != nil { t.Errorf("got unexpected Write error on first request: %v", err) } + if record.Count() != 0 { + t.Errorf("invoked record method: %#v", record) + } // Times out timeout <- time.Time{} @@ -83,6 +105,9 @@ func TestTimeout(t *testing.T) { if !reflect.DeepEqual(status, &timeoutErr.ErrStatus) { t.Errorf("unexpected object: %s", diff.ObjectReflectDiff(&timeoutErr.ErrStatus, status)) } + if record.Count() != 1 { + t.Errorf("did not invoke record method: %#v", record) + } // Now try to send a response sendResponse <- struct{}{}