Skip to content

Commit

Permalink
fix: custom ready checks should not count to the concurrency
Browse files Browse the repository at this point in the history
Document why we use the base function handler to provide custom ready
checks to all function modes. Additionally, make sure that the unwrapped
function handler is passed to the readiness check. This ensures that
readiness checks do not count toward the concurrency limit

Review by AE - reverts change to faas-middleware, by introducing an
interface.

Closes: #145

Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
  • Loading branch information
LucasRoesler authored and alexellis committed Oct 14, 2022
1 parent e479ad7 commit 0b79385
Show file tree
Hide file tree
Showing 429 changed files with 57,509 additions and 22,799 deletions.
2 changes: 1 addition & 1 deletion executor/forking_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type StreamingFunctionRunner struct {

// Run run a fork for each invocation
func (f *StreamingFunctionRunner) Run(req FunctionRequest) error {
log.Printf("Running: %s", req.Process)
log.Printf("Running: %s - %s", req.Process, req.Path)
start := time.Now()

var cmd *exec.Cmd
Expand Down
1 change: 1 addition & 0 deletions executor/function_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type FunctionRunner interface {

// FunctionRequest stores request for function execution
type FunctionRequest struct {
Path string
Process string
ProcessArgs []string
Environment []string
Expand Down
21 changes: 10 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ module github.com/openfaas/of-watchdog
go 1.18

require (
github.com/openfaas/faas-middleware v1.1.0
github.com/prometheus/client_golang v1.11.1
github.com/openfaas/faas-middleware v1.2.2
github.com/prometheus/client_golang v1.13.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect
google.golang.org/protobuf v1.26.0-rc.1 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)

replace github.com/openfaas/faas-middleware => github.com/LucasRoesler/faas-middleware v0.0.0-20221009175005-34767291688a
369 changes: 357 additions & 12 deletions go.sum

Large diffs are not rendered by default.

95 changes: 32 additions & 63 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,21 @@ func main() {

watchdogConfig, err := config.New(os.Environ())
if err != nil {
fmt.Fprintf(os.Stderr, "%s", err.Error())
fmt.Fprintf(os.Stderr, "Error loading config: %s", err.Error())
os.Exit(1)
}

if watchdogConfig.ReadyEndpoint != "" {
log.Printf("Using function ready endpoint: %q", watchdogConfig.ReadyEndpoint)
}
// baseFunctionHandler is the function invoker without any other middlewares.
// It is used to provide a generic way to implement the readiness checks regardless
// of the request mode.
baseFunctionHandler := buildRequestHandler(watchdogConfig, watchdogConfig.PrefixLogs)
requestHandler := baseFunctionHandler

requestHandler := buildRequestHandler(watchdogConfig, watchdogConfig.PrefixLogs)
var limit *limiter.ConcurrencyLimiter
var limit limiter.Limiter
if watchdogConfig.MaxInflight > 0 {
limit = limiter.NewConcurrencyLimiter(requestHandler, watchdogConfig.MaxInflight)
requestHandler = limit.Handler()
requestLimiter := limiter.NewConcurrencyLimiter(requestHandler, watchdogConfig.MaxInflight)
requestHandler = requestLimiter.Handler()
limit = requestLimiter
}

log.Printf("Watchdog mode: %s\n", config.WatchdogMode(watchdogConfig.OperationalMode))
Expand All @@ -82,7 +84,9 @@ func main() {
http.HandleFunc("/", metrics.InstrumentHandler(requestHandler, httpMetrics))
http.HandleFunc("/_/health", makeHealthHandler())
http.Handle("/_/ready", &readiness{
functionHandler: requestHandler,
// make sure to pass original handler, before it's been wrapped by
// the limiter
functionHandler: baseFunctionHandler,
endpoint: watchdogConfig.ReadyEndpoint,
lockCheck: lockFilePresent,
limiter: limit,
Expand Down Expand Up @@ -238,6 +242,10 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig, log
environment = getEnvironment(r)
}

path := "/"
if r.URL != nil {
path = r.URL.Path
}
commandName, arguments := watchdogConfig.Process()
req := executor.FunctionRequest{
Process: commandName,
Expand All @@ -246,6 +254,7 @@ func makeSerializingForkRequestHandler(watchdogConfig config.WatchdogConfig, log
ContentLength: &r.ContentLength,
OutputWriter: w,
Environment: environment,
Path: path,
}

w.Header().Set("Content-Type", watchdogConfig.ContentType)
Expand All @@ -271,23 +280,27 @@ func makeStreamingRequestHandler(watchdogConfig config.WatchdogConfig, prefixLog
environment = getEnvironment(r)
}

path := "/"
if r.URL != nil {
path = r.URL.Path
}
commandName, arguments := watchdogConfig.Process()
req := executor.FunctionRequest{
Process: commandName,
ProcessArgs: arguments,
InputReader: r.Body,
OutputWriter: w,
Environment: environment,
Path: path,
}

w.Header().Set("Content-Type", watchdogConfig.ContentType)
err := functionInvoker.Run(req)
if err != nil {
log.Println(err.Error())

// Probably cannot write to client if we already have written a header
// w.WriteHeader(500)
// w.Write([]byte(err.Error()))
// Cannot write a status code to the client because we
// already have written a header
}
}
}
Expand Down Expand Up @@ -344,11 +357,16 @@ func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs boo

return func(w http.ResponseWriter, r *http.Request) {

path := "/"
if r.URL != nil {
path = r.URL.Path
}
req := executor.FunctionRequest{
Process: commandName,
ProcessArgs: arguments,
InputReader: r.Body,
OutputWriter: w,
Path: path,
}

if r.Body != nil {
Expand All @@ -373,67 +391,18 @@ func makeStaticRequestHandler(watchdogConfig config.WatchdogConfig) http.Handler

func lockFilePresent() bool {
path := filepath.Join(os.TempDir(), ".lock")

if _, err := os.Stat(path); os.IsNotExist(err) {
return false
}
return true
}

type readiness struct {
functionHandler http.Handler
endpoint string
lockCheck func() bool
limiter Limiter
}

func (r *readiness) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
status := http.StatusOK

switch {
case atomic.LoadInt32(&acceptingConnections) == 0, !r.lockCheck():
status = http.StatusServiceUnavailable
case r.limiter.Met():
status = http.StatusTooManyRequests
case r.endpoint != "":
upstream := url.URL{
Scheme: req.URL.Scheme,
Host: req.URL.Host,
Path: r.endpoint,
}

readyReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, upstream.String(), nil)
if err != nil {
log.Printf("Error creating readiness request: %s", err)
status = http.StatusInternalServerError
break
}

// we need to set the raw RequestURI for the function invoker to see our URL path,
// otherwise it will just route to `/`, typically this shouldn't be used or set
readyReq.RequestURI = r.endpoint
readyReq.Header = req.Header.Clone()
r.functionHandler.ServeHTTP(w, readyReq)
return
}

w.WriteHeader(status)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}

type Limiter interface {
Met() bool
return true
}

func makeHealthHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if atomic.LoadInt32(&acceptingConnections) == 0 || !lockFilePresent() {
if atomic.LoadInt32(&acceptingConnections) == 0 || lockFilePresent() == false {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
Expand Down
74 changes: 74 additions & 0 deletions readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"log"
"net/http"
"net/url"
"sync/atomic"

limiter "github.com/openfaas/faas-middleware/concurrency-limiter"
)

type readiness struct {
// functionHandler is the function invoke HTTP Handler. Using this allows
// custom ready checks in all invoke modes. For example, in forking mode
// the handler implementation (a bash script) can check the path in the env
// and respond accordingly, exit non-zero when not ready.
functionHandler http.Handler
endpoint string
lockCheck func() bool
limiter limiter.Limiter
}

// LimitMet returns true if the concurrency limit has been reached
// or false if no limiter has been used
func (r *readiness) LimitMet() bool {
if r.limiter == nil {
return false
}
return r.limiter.Met()
}

func (r *readiness) ServeHTTP(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case http.MethodGet:
status := http.StatusOK

switch {
case atomic.LoadInt32(&acceptingConnections) == 0, !r.lockCheck():
status = http.StatusServiceUnavailable
case r.LimitMet():
log.Println("Limited")

status = http.StatusTooManyRequests
case r.endpoint != "":
upstream := url.URL{
Scheme: req.URL.Scheme,
Host: req.URL.Host,
Path: r.endpoint,
}

readyReq, err := http.NewRequestWithContext(req.Context(), http.MethodGet, upstream.String(), nil)
if err != nil {
log.Printf("Error creating readiness request to: %s : %s", upstream.String(), err)
status = http.StatusInternalServerError
break
}

// we need to set the raw RequestURI for the function invoker to see our URL path,
// otherwise it will just route to `/`, typically this shouldn't be used or set
readyReq.RequestURI = r.endpoint
readyReq.Header = req.Header.Clone()

// Instead of calling http.DefaultClient.Do(), which only works with http mode
// calling this handler can fork a process to run a request, such as when
// using bash as the function.
r.functionHandler.ServeHTTP(w, readyReq)
return
}

w.WriteHeader(status)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
8 changes: 0 additions & 8 deletions vendor/github.com/cespare/xxhash/v2/.travis.yml

This file was deleted.

6 changes: 4 additions & 2 deletions vendor/github.com/cespare/xxhash/v2/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion vendor/github.com/cespare/xxhash/v2/xxhash.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0b79385

Please sign in to comment.