Skip to content

Commit

Permalink
Implement graceful shutdown
Browse files Browse the repository at this point in the history
- As outlined in openfaas/faas#873 and
in released 0.9.4 of the classic watchdog. This commit
dual-maintains the required changes to allow Kubernetes to remove
traffic in a graceful way meaning we will not see connection
refused errors and in-flight HTTP requests still get a chance to
finish.

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
  • Loading branch information
alexellis committed Sep 17, 2018
1 parent 10751f5 commit 3a2e7bf
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 32 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ Environmental variables:
| `read_timeout` | Yes | HTTP timeout for reading the payload from the client caller (in seconds) |
| `write_timeout` | Yes | HTTP timeout for writing a response body from your function (in seconds) |
| `exec_timeout` | Yes | Exec timeout for process exec'd for each incoming request (in seconds). Disabled if set to 0. |
| `port` | Yes | Specify an alternative TCP port fo testing |
| `port` | Yes | Specify an alternative TCP port for testing |
| `write_debug` | No | Write all output, error messages, and additional information to the logs. Default is false. |
| `content_type` | Yes | Force a specific Content-Type response for all responses - only in forking/serializing modes. |
| `suppress_lock` | No | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |
| `suppress_lock` | Yes | The watchdog will attempt to write a lockfile to /tmp/ for swarm healthchecks - set this to true to disable behaviour. |
| `upstream_url` | Yes | `http` mode only - where to forward requests i.e. 127.0.0.1:5000 |

> Note: the .lock file is implemented for health-checking, but cannot be disabled yet. You must create this file in /tmp/.
22 changes: 21 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type WatchdogConfig struct {
ContentType string
InjectCGIHeaders bool
OperationalMode int
SuppressLock bool
UpstreamURL string
}

// Process returns a string for the process and a slice for the arguments from the FunctionProcess.
Expand All @@ -36,7 +38,11 @@ func New(env []string) (WatchdogConfig, error) {

envMap := mapEnv(env)

var functionProcess string
var (
functionProcess string
upstreamURL string
)

if val, exists := envMap["fprocess"]; exists {
functionProcess = val
}
Expand All @@ -45,6 +51,10 @@ func New(env []string) (WatchdogConfig, error) {
functionProcess = val
}

if val, exists := envMap["upstream_url"]; exists {
upstreamURL = val
}

contentType := "application/octet-stream"
if val, exists := envMap["content_type"]; exists {
contentType = val
Expand All @@ -59,6 +69,8 @@ func New(env []string) (WatchdogConfig, error) {
ExecTimeout: getDuration(envMap, "exec_timeout", time.Second*10),
OperationalMode: ModeStreaming,
ContentType: contentType,
SuppressLock: getBool(envMap, "suppress_lock"),
UpstreamURL: upstreamURL,
}

if val := envMap["mode"]; len(val) > 0 {
Expand Down Expand Up @@ -103,3 +115,11 @@ func getInt(env map[string]string, key string, defaultValue int) int {

return result
}

func getBool(env map[string]string, key string) bool {
if env[key] == "true" {
return true
}

return false
}
8 changes: 0 additions & 8 deletions executor/http_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net"
"net/http"
"net/url"
"os"
"os/exec"
"sync"
"time"
Expand Down Expand Up @@ -83,13 +82,6 @@ func (f *HTTPFunctionRunner) Start() error {

f.Client = makeProxyClient(f.ExecTimeout)

urlValue, upstreamURLErr := url.Parse(os.Getenv("upstream_url"))
if upstreamURLErr != nil {
log.Fatal(upstreamURLErr)
}

f.UpstreamURL = urlValue

return cmd.Start()
}

Expand Down
120 changes: 105 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
package main

import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"strings"
"sync/atomic"
"syscall"
"time"

"github.com/openfaas-incubator/of-watchdog/config"
"github.com/openfaas-incubator/of-watchdog/executor"
)

var (
acceptingConnections int32
)

func main() {
atomic.StoreInt32(&acceptingConnections, 0)

watchdogConfig, configErr := config.New(os.Environ())
if configErr != nil {
fmt.Fprintf(os.Stderr, configErr.Error())
Expand All @@ -25,24 +37,86 @@ func main() {
os.Exit(-1)
}

requestHandler := buildRequestHandler(watchdogConfig)

log.Printf("OperationalMode: %s\n", config.WatchdogMode(watchdogConfig.OperationalMode))

http.HandleFunc("/", requestHandler)
http.HandleFunc("/_/health", makeHealthHandler())

shutdownTimeout := watchdogConfig.HTTPWriteTimeout

s := &http.Server{
Addr: fmt.Sprintf(":%d", watchdogConfig.TCPPort),
ReadTimeout: watchdogConfig.HTTPReadTimeout,
WriteTimeout: watchdogConfig.HTTPWriteTimeout,
MaxHeaderBytes: 1 << 20, // Max header of 1MB
}

requestHandler := buildRequestHandler(watchdogConfig)
listenUntilShutdown(shutdownTimeout, s, watchdogConfig.SuppressLock)
}

log.Printf("OperationalMode: %s\n", config.WatchdogMode(watchdogConfig.OperationalMode))
func markUnhealthy() error {
atomic.StoreInt32(&acceptingConnections, 0)

path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Removing lock-file : %s\n", path)
removeErr := os.Remove(path)
return removeErr
}

func listenUntilShutdown(shutdownTimeout time.Duration, s *http.Server, suppressLock bool) {

if err := lock(); err != nil {
log.Panic(err.Error())
idleConnsClosed := make(chan struct{})
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM)

<-sig

log.Printf("SIGTERM received.. shutting down server in %s\n", shutdownTimeout.String())

healthErr := markUnhealthy()

if healthErr != nil {
log.Printf("Unable to mark unhealthy during shutdown: %s\n", healthErr.Error())
}

<-time.Tick(shutdownTimeout)

if err := s.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Printf("Error in Shutdown: %v", err)
}

log.Printf("No new connections allowed. Exiting in: %s\n", shutdownTimeout.String())

<-time.Tick(shutdownTimeout)

close(idleConnsClosed)
}()

// Run the HTTP server in a separate go-routine.
go func() {
if err := s.ListenAndServe(); err != http.ErrServerClosed {
log.Printf("Error ListenAndServe: %v", err)
close(idleConnsClosed)
}
}()

if suppressLock == false {
path, writeErr := createLockFile()

if writeErr != nil {
log.Panicf("Cannot write %s. To disable lock-file set env suppress_lock=true.\n Error: %s.\n", path, writeErr.Error())
}
} else {
log.Println("Warning: \"suppress_lock\" is enabled. No automated health-checks will be in place for your function.")

atomic.StoreInt32(&acceptingConnections, 1)
}

http.HandleFunc("/", requestHandler)
http.HandleFunc("/_/health", makeHealthHandler())
log.Fatal(s.ListenAndServe())
<-idleConnsClosed
}

func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.HandlerFunc {
Expand All @@ -69,11 +143,16 @@ func buildRequestHandler(watchdogConfig config.WatchdogConfig) http.HandlerFunc
return requestHandler
}

func lock() error {
lockFile := filepath.Join(os.TempDir(), ".lock")
log.Printf("Writing lock file at: %s", lockFile)
return ioutil.WriteFile(lockFile, nil, 0600)
// createLockFile returns a path to a lock file and/or an error
// if the file could not be created.
func createLockFile() (string, error) {
path := filepath.Join(os.TempDir(), ".lock")
log.Printf("Writing lock-file to: %s\n", path)
writeErr := ioutil.WriteFile(path, []byte{}, 0660)

atomic.StoreInt32(&acceptingConnections, 1)

return path, writeErr
}

func makeAfterBurnRequestHandler(watchdogConfig config.WatchdogConfig) func(http.ResponseWriter, *http.Request) {
Expand All @@ -84,7 +163,7 @@ func makeAfterBurnRequestHandler(watchdogConfig config.WatchdogConfig) func(http
ProcessArgs: arguments,
}

fmt.Printf("Forking - %s %s\n", commandName, arguments)
log.Printf("Forking %s %s\n", commandName, arguments)
functionInvoker.Start()

return func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -203,6 +282,16 @@ func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig) func(http.Resp
ProcessArgs: arguments,
}

if len(watchdogConfig.UpstreamURL) == 0 {
log.Fatal(`For mode=http you must specify a valid URL for "upstream_url"`)
}

urlValue, upstreamURLErr := url.Parse(watchdogConfig.UpstreamURL)
if upstreamURLErr != nil {
log.Fatal(upstreamURLErr)
}
functionInvoker.UpstreamURL = urlValue

fmt.Printf("Forking - %s %s\n", commandName, arguments)
functionInvoker.Start()

Expand Down Expand Up @@ -237,17 +326,18 @@ func lockFilePresent() bool {
return true
}

func makeHealthHandler() func(w http.ResponseWriter, r *http.Request) {
func makeHealthHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
if lockFilePresent() == false {
w.WriteHeader(http.StatusInternalServerError)
if atomic.LoadInt32(&acceptingConnections) == 0 || lockFilePresent() == false {
w.WriteHeader(http.StatusServiceUnavailable)
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))

break
default:
w.WriteHeader(http.StatusMethodNotAllowed)
Expand Down
14 changes: 8 additions & 6 deletions requesthandler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"log"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -13,12 +14,14 @@ func TestHealthHandler_StatusOK_LockFilePresent(t *testing.T) {

present := lockFilePresent()

if present == false {
if err := lock(); err != nil {
t.Fatal(err)
}
if present {
path := filepath.Join(os.TempDir(), ".lock")
os.Remove(path)
}

if tmpPath, err := createLockFile(); err != nil {
log.Fatalf("Error writing to %s - %s\n", tmpPath, err)
}
req, err := http.NewRequest(http.MethodGet, "/_/health", nil)
if err != nil {
t.Fatal(err)
Expand All @@ -30,7 +33,6 @@ func TestHealthHandler_StatusOK_LockFilePresent(t *testing.T) {
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code - want: %v, got: %v", required, status)
}

}

func TestHealthHandler_StatusInternalServerError_LockFileNotPresent(t *testing.T) {
Expand All @@ -49,7 +51,7 @@ func TestHealthHandler_StatusInternalServerError_LockFileNotPresent(t *testing.T
handler := makeHealthHandler()
handler(rr, req)

required := http.StatusInternalServerError
required := http.StatusServiceUnavailable
if status := rr.Code; status != required {
t.Errorf("handler returned wrong status code - want: %v, got: %v", required, status)
}
Expand Down

0 comments on commit 3a2e7bf

Please sign in to comment.