Skip to content

Commit

Permalink
Updating grpc handler to gracefully close backend connections
Browse files Browse the repository at this point in the history
This should address part 1 of #807 and #912

update grpc docs
  • Loading branch information
nathanejohnson committed Nov 18, 2022
1 parent 458c701 commit ea9d2f2
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 65 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Proxy struct {
AuthSchemes map[string]AuthScheme
GRPCMaxRxMsgSize int
GRPCMaxTxMsgSize int
GRPCGShutdownTimeout time.Duration
}

type STSHeader struct {
Expand Down
25 changes: 13 additions & 12 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ var defaultConfig = &Config{
},
},
Proxy: Proxy{
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
GRPCGShutdownTimeout: time.Second * 2,
},
Registry: Registry{
Backend: "consul",
Expand Down
4 changes: 2 additions & 2 deletions config/kvslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

// parseKVSlice parses a configuration string in the form
//
// key=val;key=val,key=val;key=val
// key=val;key=val,key=val;key=val
//
// into a list of string maps. maps are separated by comma and key/value
// pairs within a map are separated by semicolons. The first key/value
// pair of a map can omit the key and its value will be stored under the
// empty key. This allows support of legacy configuration formats which
// are
//
// val;opt1=val1;opt2=val2;...
// val;opt1=val1;opt2=val2;...
func parseKVSlice(in string) ([]map[string]string, error) {
var keyOrFirstVal string
maps := []map[string]string{}
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend")
f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")
Expand Down
37 changes: 19 additions & 18 deletions demo/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,34 @@
//
// During startup the server performs the following steps:
//
// - Add a handler for each prefix which provides a unique
// response for that instance and endpoint
// - Add a `/health` handler for the consul health check
// - Register the service in consul with the listen address,
// a health check under the given name and with one `urlprefix-`
// tag per prefix
// - Install a signal handler to deregister the service on exit
// * Add a handler for each prefix which provides a unique
// response for that instance and endpoint
// * Add a `/health` handler for the consul health check
// * Register the service in consul with the listen address,
// a health check under the given name and with one `urlprefix-`
// tag per prefix
// * Install a signal handler to deregister the service on exit
//
// If the protocol is set to "ws" the registered endpoints function
// as websocket echo servers.
//
// Example:
//
// # http server
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
// # http server
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
//
// # https server
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
// # https server
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
//
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
package main

import (
Expand Down
10 changes: 10 additions & 0 deletions docs/content/ref/proxy.grpcmaxrxmsgsize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: "proxy.grpcmaxrxmsgsize"
---

`proxy.grpcmaxrxmsgsize` configures the grpc max receive message size in bytes. The default
value is

proxy.grpcmaxrxmsgsize = 4194304

which is 4MB
10 changes: 10 additions & 0 deletions docs/content/ref/proxy.grpcmaxtxmsgsize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: "proxy.grpcmaxtxmsgsize"
---

`proxy.grpcmaxtxmsgsize` configures the grpc max transmit message size in bytes. The default
value is

proxy.grpcmaxtxmsgsize = 4194304

which is 4MB
9 changes: 9 additions & 0 deletions docs/content/ref/proxy.grpcshutdowntimeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: "proxy.grpcshutdowntimeout"
---

`proxy.grpcshutdowntimeout` configures the amount of time fabio will wait to attempt
to close the connection while waiting for grpc traffic to finish to a backend that's been
deregistered. The default value is

proxy.grpcshutdowntimeout = 2s
6 changes: 6 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,12 @@
# The default is
# proxy.grpcmaxtxmsgsize = 4194304
#
#
# proxy.grpcshutdowntimeout configures the amount of time fabio will wait to attempt
# to close the connection while waiting for grpc traffic to finish to a backend that's been
# deregistered. Default value is
# proxy.grpcshutdowntimeout = 2s
# setting to 0s disables the wait.

# log.access.format configures the format of the access log.
#
Expand Down
63 changes: 32 additions & 31 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,38 @@
// takes place. Text between two fields is printed verbatim. See the common
// log file formats for an example.
//
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
// $remote_addr - host:port of remote client
// $remote_host - host of remote client
// $remote_port - port of remote client
// $request - request <method> <uri> <proto>
// $request_args - request query parameters
// $request_host - request host header (aka server name)
// $request_method - request method
// $request_scheme - request scheme
// $request_uri - request URI
// $request_url - request URL
// $request_proto - request protocol
// $response_body_size - response body size in bytes
// $response_status - response status code
// $response_time_ms - response time in S.sss format
// $response_time_us - response time in S.ssssss format
// $response_time_ns - response time in S.sssssssss format
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
// $time_unix_ms - log timestamp in unix epoch ms
// $time_unix_us - log timestamp in unix epoch us
// $time_unix_ns - log timestamp in unix epoch ns
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
// $upstream_addr - host:port of upstream server
// $upstream_host - host of upstream server
// $upstream_port - port of upstream server
// $upstream_request_scheme - upstream request scheme
// $upstream_request_uri - upstream request URI
// $upstream_request_url - upstream request URL
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
// $remote_addr - host:port of remote client
// $remote_host - host of remote client
// $remote_port - port of remote client
// $request - request <method> <uri> <proto>
// $request_args - request query parameters
// $request_host - request host header (aka server name)
// $request_method - request method
// $request_scheme - request scheme
// $request_uri - request URI
// $request_url - request URL
// $request_proto - request protocol
// $response_body_size - response body size in bytes
// $response_status - response status code
// $response_time_ms - response time in S.sss format
// $response_time_us - response time in S.ssssss format
// $response_time_ns - response time in S.sssssssss format
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
// $time_unix_ms - log timestamp in unix epoch ms
// $time_unix_us - log timestamp in unix epoch us
// $time_unix_ns - log timestamp in unix epoch ns
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
// $upstream_addr - host:port of upstream server
// $upstream_host - host of upstream server
// $upstream_port - port of upstream server
// $upstream_request_scheme - upstream request scheme
// $upstream_request_uri - upstream request URI
// $upstream_request_url - upstream request URL
//
package logger

import (
Expand Down
11 changes: 9 additions & 2 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,21 @@ func (p *grpcConnectionPool) cleanup() {
p.lock.Lock()
table := route.GetTable()
for tKey, cs := range p.connections {
if cs.GetState() == connectivity.Shutdown {
state := cs.GetState()
if state == connectivity.Shutdown {
delete(p.connections, tKey)
continue
}

if !hasTarget(tKey, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
cs.Close()
go func(cs *grpc.ClientConn, state connectivity.State) {
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout)
defer cancel()
// wait for state to change, or timeout, before closing, in case it's still handling traffic.
cs.WaitForStateChange(ctx, state)
cs.Close()
}(cs, state)
delete(p.connections, tKey)
}
}
Expand Down

0 comments on commit ea9d2f2

Please sign in to comment.