From 42bfe898d51a3ab023ca674a617b5c8106e19c4d Mon Sep 17 00:00:00 2001 From: Lucas Roesler Date: Sun, 9 Oct 2022 20:57:22 +0200 Subject: [PATCH] feat: allow proxying readiness checks to the function Allow setting an endpoint path for the function readiness check via an ENV variable `function_ready_endpoint` When this value is set, the requests to `/_/ready` will execute an empty GET request with/to the configured endpoint. This allows the function authors to implement custom readiness check logic. This custom request is checked _after_ the the standard liviness checks and the ConcurrencyLimiter check. For a completely custom readiness check, the function should be deployed with `max_inflight == 0` and `function_ready_endpoint` to the custom path. Signed-off-by: Lucas Roesler --- .gitignore | 1 + Makefile | 20 ++-- README.md | 1 + config/config.go | 5 + go.mod | 2 + go.sum | 6 +- main.go | 66 ++++++++++--- readiness_test.go | 94 +++++++++++++++++++ .../concurrency_limiter.go | 11 +++ vendor/modules.txt | 3 +- 10 files changed, 179 insertions(+), 30 deletions(-) create mode 100644 readiness_test.go diff --git a/.gitignore b/.gitignore index c155b5d8..c532e7db 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ template bin /handler /Dockerfile2 +.vscode diff --git a/Makefile b/Makefile index 88e9779f..afc6d004 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,11 @@ -.GIT_COMMIT=$(shell git rev-parse HEAD) -.GIT_VERSION=$(shell git describe --tags --always --dirty 2>/dev/null) -.GIT_UNTRACKEDCHANGES := $(shell git status --porcelain --untracked-files=no) -ifneq ($(.GIT_UNTRACKEDCHANGES),) - .GIT_VERSION := $(.GIT_VERSION)-$(shell date +"%s") +GIT_COMMIT=$(shell git rev-parse HEAD) +GIT_VERSION=$(shell git describe --tags --always --dirty 2>/dev/null) +GIT_UNTRACKEDCHANGES := $(shell git status --porcelain --untracked-files=no) +ifneq ($(GIT_UNTRACKEDCHANGES),) + GIT_VERSION := $(GIT_VERSION)-$(shell date +"%s") endif -LDFLAGS := "-s -w -X main.Version=$(.GIT_VERSION) -X main.GitCommit=$(.GIT_COMMIT)" +LDFLAGS := "-s -w -X main.Version=$(GIT_VERSION) -X main.GitCommit=$(GIT_COMMIT)" SERVER?=ghcr.io OWNER?=openfaas @@ -32,9 +32,9 @@ gofmt: build: @echo "+ $@" @docker build \ - --build-arg GIT_COMMIT=${.GIT_COMMIT} \ - --build-arg VERSION=${.GIT_VERSION} \ - -t ${.IMAGE}:${TAG} . + --build-arg GIT_COMMIT=${GIT_COMMIT} \ + --build-arg VERSION=${GIT_VERSION} \ + -t $(SERVER)/$(OWNER)/$(IMG_NAME):$(TAG) . .PHONY: hashgen hashgen: @@ -53,7 +53,7 @@ dist: print-image: @echo ${.IMAGE} -# Example: +# Example: # SERVER=docker.io OWNER=alexellis2 TAG=ready make publish .PHONY: publish publish: diff --git a/README.md b/README.md index e1c25020..39706e49 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,7 @@ Environmental variables: | `http_buffer_req_body` | `http` mode only - buffers request body in memory before forwarding upstream to your template's `upstream_url`. Use if your upstream HTTP server does not accept `Transfer-Encoding: chunked`, for example WSGI tends to require this setting. Default: `false` | | `buffer_http` | deprecated alias for `http_buffer_req_body`, will be removed in future version | | `static_path` | Absolute or relative path to the directory that will be served if `mode="static"` | +| `function_ready_endpoint` | When non-empty, requests to `/_/ready` will invoke the function handler with this path. This can be used to provide custom readiness logic. When `max_inflight` is set, the concurrency limit is checked first before proxying the request to the function. | Unsupported options from the [Classic Watchdog](https://github.com/openfaas/classic-watchdog): diff --git a/config/config.go b/config/config.go index 1dcba00e..98b090c9 100644 --- a/config/config.go +++ b/config/config.go @@ -48,6 +48,10 @@ type WatchdogConfig struct { // LogBufferSize is the size for scanning logs for stdout/stderr LogBufferSize int + + // ReadyEndpoint is the custom readiness path for the watchdog. When non-empty + // the /_/ready endpoint with proxy the request to this path. + ReadyEndpoint string } // Process returns a string for the process and a slice for the arguments from the FunctionProcess. @@ -139,6 +143,7 @@ func New(env []string) (WatchdogConfig, error) { MaxInflight: getInt(envMap, "max_inflight", 0), PrefixLogs: prefixLogs, LogBufferSize: logBufferSize, + ReadyEndpoint: envMap["function_ready_endpoint"], } if val := envMap["mode"]; len(val) > 0 { diff --git a/go.mod b/go.mod index 4f2edc9c..6a02760c 100644 --- a/go.mod +++ b/go.mod @@ -18,3 +18,5 @@ require ( golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect google.golang.org/protobuf v1.26.0-rc.1 // indirect ) + +replace github.com/openfaas/faas-middleware => github.com/LucasRoesler/faas-middleware v0.0.0-20221009175005-34767291688a diff --git a/go.sum b/go.sum index 9e1092e3..1146dbc3 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/LucasRoesler/faas-middleware v0.0.0-20221009175005-34767291688a h1:HKZa7gP492/iibryQLsj+r/5JkEO8bn2YU4WktW6RNs= +github.com/LucasRoesler/faas-middleware v0.0.0-20221009175005-34767291688a/go.mod h1:RgkVC/llBh+Eqb4bKxcFneB4OMnYsjUrEs7TWrRf51s= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -58,10 +60,6 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/openfaas/faas-middleware v1.0.0 h1:3w7v3sxhR55ulfDFfVHcStAKpkf06ufoZtv3pJkifvI= -github.com/openfaas/faas-middleware v1.0.0/go.mod h1:R5CaeiPy8uo7bWTFJdJJ/9I82XLn6WeXUAjNq2a+afY= -github.com/openfaas/faas-middleware v1.1.0 h1:i6DaUQxrg4FhMpl/to/VIEL6Aq9K/mo9EpRs0JPIqtI= -github.com/openfaas/faas-middleware v1.1.0/go.mod h1:RgkVC/llBh+Eqb4bKxcFneB4OMnYsjUrEs7TWrRf51s= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/main.go b/main.go index 2c3f1179..0c5603ca 100644 --- a/main.go +++ b/main.go @@ -65,6 +65,10 @@ func main() { os.Exit(1) } + if watchdogConfig.ReadyEndpoint != "" { + log.Printf("Using function ready endpoint: %q", watchdogConfig.ReadyEndpoint) + } + requestHandler := buildRequestHandler(watchdogConfig, watchdogConfig.PrefixLogs) var limit *limiter.ConcurrencyLimiter if watchdogConfig.MaxInflight > 0 { @@ -77,7 +81,12 @@ func main() { httpMetrics := metrics.NewHttp() http.HandleFunc("/", metrics.InstrumentHandler(requestHandler, httpMetrics)) http.HandleFunc("/_/health", makeHealthHandler()) - http.HandleFunc("/_/ready", makeReadyHandler(limit)) + http.Handle("/_/ready", &readiness{ + functionHandler: requestHandler, + endpoint: watchdogConfig.ReadyEndpoint, + lockCheck: lockFilePresent, + limiter: limit, + }) metricsServer := metrics.MetricsServer{} metricsServer.Register(watchdogConfig.MetricsPort) @@ -371,27 +380,54 @@ func lockFilePresent() bool { return true } -func makeReadyHandler(limit *limiter.ConcurrencyLimiter) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - status := http.StatusOK +type readiness struct { + functionHandler http.Handler + endpoint string + lockCheck func() bool + limiter Limiter +} - if atomic.LoadInt32(&acceptingConnections) == 0 || !lockFilePresent() { - status = http.StatusServiceUnavailable - } else if limit != nil { - if limit.Met() { - status = http.StatusTooManyRequests - } +func (r *readiness) ServeHTTP(w http.ResponseWriter, req *http.Request) { + switch req.Method { + case http.MethodGet: + status := http.StatusOK + + switch { + case atomic.LoadInt32(&acceptingConnections) == 0, !r.lockCheck(): + status = http.StatusServiceUnavailable + case r.limiter.Met(): + status = http.StatusTooManyRequests + case r.endpoint != "": + upstream := url.URL{ + Scheme: req.URL.Scheme, + Host: req.URL.Host, + Path: r.endpoint, } - w.WriteHeader(status) - default: - w.WriteHeader(http.StatusMethodNotAllowed) + readyReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, upstream.String(), nil) + if err != nil { + log.Printf("Error creating readiness request: %s", err) + status = http.StatusInternalServerError + break + } + + // we need to set the raw RequestURI for the function invoker to see our URL path, + // otherwise it will just route to `/`, typically this shouldn't be used or set + readyReq.RequestURI = r.endpoint + r.functionHandler.ServeHTTP(w, readyReq) + return } + + w.WriteHeader(status) + default: + w.WriteHeader(http.StatusMethodNotAllowed) } } +type Limiter interface { + Met() bool +} + func makeHealthHandler() func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { diff --git a/readiness_test.go b/readiness_test.go new file mode 100644 index 00000000..99bc973e --- /dev/null +++ b/readiness_test.go @@ -0,0 +1,94 @@ +package main + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestReadinessHandler(t *testing.T) { + cases := []struct { + name string + endpoint string + limitMet bool + acceptingConnections int32 + readyResponseCode int + expectedCode int + }{ + { + name: "return 503 when not accepting connections", + acceptingConnections: 0, + expectedCode: http.StatusServiceUnavailable, + }, + { + name: "returns 200 when no upstream endpoint and no limiter", + acceptingConnections: 1, + expectedCode: http.StatusOK, + }, + { + name: "returns the upstream endpoint response code when no limiter", + acceptingConnections: 1, + endpoint: "/custom/ready", + readyResponseCode: http.StatusNoContent, + expectedCode: http.StatusNoContent, + }, + { + name: "return 429 when limiter is met", + limitMet: true, + acceptingConnections: 1, + expectedCode: http.StatusTooManyRequests, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + upstream := testUpstreamHandler(tc.endpoint, tc.readyResponseCode) + handler := &readiness{ + functionHandler: upstream, + endpoint: tc.endpoint, + lockCheck: func() bool { return true }, + limiter: &testLimiter{met: tc.limitMet}, + } + + rr := httptest.NewRecorder() + req, err := http.NewRequest(http.MethodGet, "/_/ready", nil) + if err != nil { + t.Fatal(err) + } + + acceptingConnections = tc.acceptingConnections + handler.ServeHTTP(rr, req) + + if status := rr.Code; status != tc.expectedCode { + t.Errorf("handler returned wrong status code - want: %v, got: %v", tc.expectedCode, status) + } + }) + } +} + +func testUpstreamHandler(endpoint string, status int) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != endpoint { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + w.WriteHeader(status) + }) +} + +type testLimiter struct { + met bool +} + +func (t *testLimiter) Met() bool { + if t == nil { + return false + } + return t.met +} diff --git a/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go b/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go index 9043dcc5..2ceeda99 100644 --- a/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go +++ b/vendor/github.com/openfaas/faas-middleware/concurrency-limiter/concurrency_limiter.go @@ -39,6 +39,10 @@ type ConcurrencyLimiter struct { } func (cl *ConcurrencyLimiter) Met() bool { + if cl == nil { + return false + } + // We should not have any ConcurrencyLimiter created with a limit of 0 // but return early if that's the case. if cl.maxInflightRequests == 0 { @@ -55,6 +59,7 @@ func (cl *ConcurrencyLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) // but we'll check anyway and return early. if cl.maxInflightRequests == 0 { cl.backendHTTPHandler.ServeHTTP(w, r) + return } requestsStarted := atomic.AddUint64(&cl.requestsStarted, 1) @@ -62,7 +67,13 @@ func (cl *ConcurrencyLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) if requestsStarted-completedRequested > cl.maxInflightRequests { // This is a failure pathway, and we do not want to block on the write to finish atomic.AddUint64(&cl.requestsCompleted, 1) + + // Some APIs only return JSON, since we can interfere here and send a plain/text + // message, let's do the right thing so that downstream users can consume it. + w.Header().Add("Content-Type", "text/plain") + w.WriteHeader(http.StatusTooManyRequests) + fmt.Fprintf(w, "Concurrent request limit exceeded. Max concurrent requests: %d\n", cl.maxInflightRequests) return } diff --git a/vendor/modules.txt b/vendor/modules.txt index 606bfdb2..b0486391 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -14,7 +14,7 @@ github.com/golang/protobuf/ptypes/timestamp # github.com/matttproud/golang_protobuf_extensions v1.0.1 ## explicit github.com/matttproud/golang_protobuf_extensions/pbutil -# github.com/openfaas/faas-middleware v1.1.0 +# github.com/openfaas/faas-middleware v1.1.0 => github.com/LucasRoesler/faas-middleware v0.0.0-20221009175005-34767291688a ## explicit; go 1.18 github.com/openfaas/faas-middleware/concurrency-limiter # github.com/prometheus/client_golang v1.11.1 @@ -73,3 +73,4 @@ google.golang.org/protobuf/runtime/protoimpl google.golang.org/protobuf/types/known/anypb google.golang.org/protobuf/types/known/durationpb google.golang.org/protobuf/types/known/timestamppb +# github.com/openfaas/faas-middleware => github.com/LucasRoesler/faas-middleware v0.0.0-20221009175005-34767291688a