Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating grpc handler to gracefully close backend connections #913

Merged
merged 1 commit into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Proxy struct {
AuthSchemes map[string]AuthScheme
GRPCMaxRxMsgSize int
GRPCMaxTxMsgSize int
GRPCGShutdownTimeout time.Duration
}

type STSHeader struct {
Expand Down
25 changes: 13 additions & 12 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ var defaultConfig = &Config{
},
},
Proxy: Proxy{
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
GRPCGShutdownTimeout: time.Second * 2,
},
Registry: Registry{
Backend: "consul",
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend")
f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")
Expand Down
10 changes: 10 additions & 0 deletions docs/content/ref/proxy.grpcmaxrxmsgsize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: "proxy.grpcmaxrxmsgsize"
---

`proxy.grpcmaxrxmsgsize` configures the grpc max receive message size in bytes. The default
value is

proxy.grpcmaxrxmsgsize = 4194304

which is 4MB
10 changes: 10 additions & 0 deletions docs/content/ref/proxy.grpcmaxtxmsgsize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
title: "proxy.grpcmaxtxmsgsize"
---

`proxy.grpcmaxtxmsgsize` configures the grpc max transmit message size in bytes. The default
value is

proxy.grpcmaxtxmsgsize = 4194304

which is 4MB
9 changes: 9 additions & 0 deletions docs/content/ref/proxy.grpcshutdowntimeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
title: "proxy.grpcshutdowntimeout"
---

`proxy.grpcshutdowntimeout` configures the amount of time fabio will wait to attempt
to close the connection while waiting for grpc traffic to finish to a backend that's been
deregistered. The default value is

proxy.grpcshutdowntimeout = 2s
6 changes: 6 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,12 @@
# The default is
# proxy.grpcmaxtxmsgsize = 4194304
#
#
# proxy.grpcshutdowntimeout configures the amount of time fabio will wait to attempt
# to close the connection while waiting for grpc traffic to finish to a backend that's been
# deregistered. Default value is
# proxy.grpcshutdowntimeout = 2s
# setting to 0s disables the wait.

# log.access.format configures the format of the access log.
#
Expand Down
11 changes: 9 additions & 2 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,21 @@ func (p *grpcConnectionPool) cleanup() {
p.lock.Lock()
table := route.GetTable()
for tKey, cs := range p.connections {
if cs.GetState() == connectivity.Shutdown {
state := cs.GetState()
if state == connectivity.Shutdown {
delete(p.connections, tKey)
continue
}

if !hasTarget(tKey, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
cs.Close()
go func(cs *grpc.ClientConn, state connectivity.State) {
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout)
defer cancel()
// wait for state to change, or timeout, before closing, in case it's still handling traffic.
cs.WaitForStateChange(ctx, state)
cs.Close()
}(cs, state)
delete(p.connections, tKey)
}
}
Expand Down