From 9b4a9e8f71580ba8a5ade52f3185f1b7c2d8726a Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Tue, 14 Jan 2020 18:50:14 -0500 Subject: [PATCH] refactor(rpc): replace pool with single connection implementation --- CHANGELOG.md | 9 +++ cli/cli.go | 2 + rpc/config.go | 7 ++- rpc/rpc.go | 148 ++++++++++++++++++++++++++++---------------------- 4 files changed, 99 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e601073a..16487b40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## 1.0.0-dev +- Use single gRPC client instance instead of a pool. ([@palkan][]) + +gRPC connection provides concurrency via H2 streams (with load balancing). Using a pool doesn't bring any performance +improvements and sometimes 'cause unstability (e.g., ResourcesExhausted or Unavailable exceptions under the load). + +We still limit the number of concurrent RPC requests. Now you can configure it via `--rpc_concurrency` setting. + +See [PR#88](https://github.com/anycable/anycable-go/pull/88) for more. + - Add `--disconnect_timeout` option to specify the timeout for graceful shutdown of the disconnect queue. ([@palkan][]) - Add `mem_sys_bytes` metric. ([@palkan][]) diff --git a/cli/cli.go b/cli/cli.go index 36f19336..967c9e64 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -56,6 +56,7 @@ func init() { fs.StringVar(&defaults.RedisChannel, "redis_channel", "__anycable__", "") fs.StringVar(&defaults.RPC.Host, "rpc_host", "localhost:50051", "") + fs.IntVar(&defaults.RPC.Concurrency, "rpc_concurrency", 28, "") fs.StringVar(&headers, "headers", "cookie", "") fs.IntVar(&defaults.WS.ReadBufferSize, "read_buffer_size", 1024, "") @@ -123,6 +124,7 @@ OPTIONS --redis_channel Redis channel for broadcasts, default: __anycable__, env: ANYCABLE_REDIS_CHANNEL --rpc_host RPC service address, default: localhost:50051, env: ANYCABLE_RPC_HOST + --rpc_concurrency Max number of concurrent RPC request; should be slightly less than the RPC server concurrency, default: 28, env: ANYCABLE_RPC_CONCURRENCY --headers List of headers to proxy to RPC, default: cookie, env: ANYCABLE_HEADERS --disconnect_rate Max number of Disconnect calls per second, default: 100, env: ANYCABLE_DISCONNECT_RATE diff --git a/rpc/config.go b/rpc/config.go index 92b7817b..a662536b 100644 --- a/rpc/config.go +++ b/rpc/config.go @@ -2,10 +2,15 @@ package rpc // Config contains RPC controller configuration type Config struct { + // RPC instance host Host string + // The max number of simulteneous requests. + // Should be slightly less than the RPC server concurrency to avoid + // ResourceExhausted errors + Concurrency int } // NewConfig builds a new config func NewConfig() Config { - return Config{} + return Config{Concurrency: 28} } diff --git a/rpc/rpc.go b/rpc/rpc.go index 1b96a9b6..6d3d43df 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -4,24 +4,26 @@ import ( "context" "errors" "fmt" + "math" "time" "github.com/anycable/anycable-go/common" "github.com/anycable/anycable-go/metrics" "github.com/apex/log" - grpcpool "github.com/anycable/anycable-go/pool" pb "github.com/anycable/anycable-go/protos" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) const ( - retryInterval = 500 invokeTimeout = 3000 - initialCapacity = 5 - maxCapacity = 50 + retryExhaustedInterval = 10 + retryUnavailableInterval = 100 metricsRPCCalls = "rpc_call_total" metricsRPCFailures = "rpc_error_total" @@ -30,7 +32,8 @@ const ( // Controller implements node.Controller interface for gRPC type Controller struct { config *Config - pool grpcpool.Pool + sem chan (struct{}) + conn *grpc.ClientConn metrics *metrics.Metrics log *log.Entry } @@ -46,30 +49,42 @@ func NewController(metrics *metrics.Metrics, config *Config) *Controller { // Start initializes RPC connection pool func (c *Controller) Start() error { host := c.config.Host + capacity := c.config.Concurrency - factory := func() (*grpc.ClientConn, error) { - return grpc.Dial(host, grpc.WithInsecure()) + kacp := keepalive.ClientParameters{ + Time: 10 * time.Second, // send pings every 10 seconds if there is no activity + PermitWithoutStream: true, // send pings even without active streams } - pool, err := grpcpool.NewChannelPool(initialCapacity, maxCapacity, factory) + conn, err := grpc.Dial( + host, + grpc.WithInsecure(), + grpc.WithKeepaliveParams(kacp), + grpc.WithBalancerName("round_robin"), + ) + + c.sem = make(chan struct{}, capacity) + for i := 0; i < capacity; i++ { + c.sem <- struct{}{} + } if err == nil { - c.log.Infof("RPC pool initialized: %s", host) + c.log.Infof("RPC controller initialized: %s (concurrency %d)", host, capacity) } - c.pool = pool + c.conn = conn return err } // Shutdown closes connections func (c *Controller) Shutdown() error { - if c.pool == nil { + if c.conn == nil { return nil } - c.pool.Close() + defer c.conn.Close() - busy := c.pool.Busy() + busy := c.busy() if busy > 0 { c.log.Infof("Waiting for active RPC calls to finish: %d", busy) @@ -77,7 +92,7 @@ func (c *Controller) Shutdown() error { // Wait for active connections _, err := retry(func() (interface{}, error) { - busy := c.pool.Busy() + busy := c.busy() if busy > 0 { return false, fmt.Errorf("There are %d active RPC connections left", busy) @@ -92,15 +107,10 @@ func (c *Controller) Shutdown() error { // Authenticate performs Connect RPC call func (c *Controller) Authenticate(sid string, path string, headers *map[string]string) (string, []string, error) { - conn, err := c.getConn() - - if err != nil { - return "", nil, err - } + <-c.sem + defer func() { c.sem <- struct{}{} }() - defer conn.Close() - - client := pb.NewRPCClient(conn.Conn) + client := pb.NewRPCClient(c.conn) op := func() (interface{}, error) { return client.Connect(newContext(sid), &pb.ConnectionRequest{Path: path, Headers: *headers}) @@ -134,15 +144,10 @@ func (c *Controller) Authenticate(sid string, path string, headers *map[string]s // Subscribe performs Command RPC call with "subscribe" command func (c *Controller) Subscribe(sid string, id string, channel string) (*common.CommandResult, error) { - conn, err := c.getConn() - - if err != nil { - return nil, err - } + <-c.sem + defer func() { c.sem <- struct{}{} }() - defer conn.Close() - - client := pb.NewRPCClient(conn.Conn) + client := pb.NewRPCClient(c.conn) op := func() (interface{}, error) { return client.Command(newContext(sid), &pb.CommandMessage{Command: "subscribe", Identifier: channel, ConnectionIdentifiers: id}) @@ -155,15 +160,10 @@ func (c *Controller) Subscribe(sid string, id string, channel string) (*common.C // Unsubscribe performs Command RPC call with "unsubscribe" command func (c *Controller) Unsubscribe(sid string, id string, channel string) (*common.CommandResult, error) { - conn, err := c.getConn() - - if err != nil { - return nil, err - } + <-c.sem + defer func() { c.sem <- struct{}{} }() - defer conn.Close() - - client := pb.NewRPCClient(conn.Conn) + client := pb.NewRPCClient(c.conn) op := func() (interface{}, error) { return client.Command(newContext(sid), &pb.CommandMessage{Command: "unsubscribe", Identifier: channel, ConnectionIdentifiers: id}) @@ -176,15 +176,10 @@ func (c *Controller) Unsubscribe(sid string, id string, channel string) (*common // Perform performs Command RPC call with "perform" command func (c *Controller) Perform(sid string, id string, channel string, data string) (*common.CommandResult, error) { - conn, err := c.getConn() - - if err != nil { - return nil, err - } + <-c.sem + defer func() { c.sem <- struct{}{} }() - defer conn.Close() - - client := pb.NewRPCClient(conn.Conn) + client := pb.NewRPCClient(c.conn) op := func() (interface{}, error) { return client.Command(newContext(sid), &pb.CommandMessage{Command: "message", Identifier: channel, ConnectionIdentifiers: id, Data: data}) @@ -197,15 +192,10 @@ func (c *Controller) Perform(sid string, id string, channel string, data string) // Disconnect performs disconnect RPC call func (c *Controller) Disconnect(sid string, id string, subscriptions []string, path string, headers *map[string]string) error { - conn, err := c.getConn() - - if err != nil { - return err - } - - defer conn.Close() + <-c.sem + defer func() { c.sem <- struct{}{} }() - client := pb.NewRPCClient(conn.Conn) + client := pb.NewRPCClient(c.conn) op := func() (interface{}, error) { return client.Disconnect(newContext(sid), &pb.DisconnectRequest{Identifiers: id, Subscriptions: subscriptions, Path: path, Headers: *headers}) @@ -266,31 +256,57 @@ func (c *Controller) parseCommandResponse(response interface{}, err error) (*com return nil, errors.New("Failed to deserialize command response") } -func (c *Controller) getConn() (*grpcpool.Conn, error) { - conn, err := c.pool.Get() - - if err != nil { - return nil, err - } - - return &conn, nil +func (c *Controller) busy() int { + // The number of in-flight request is the + // the number of initial capacity "tickets" (concurrency) + // minus the size of the semaphore channel + return c.config.Concurrency - len(c.sem) } func retry(callback func() (interface{}, error)) (res interface{}, err error) { - attempts := invokeTimeout / retryInterval + retryAge := 0 + attempt := 0 + wasExhausted := false - for i := 0; ; i++ { + for { res, err = callback() if err == nil { return res, nil } - if i >= (attempts - 1) { + if retryAge > invokeTimeout { return nil, err } - time.Sleep(retryInterval * time.Millisecond) + st, ok := status.FromError(err) + if !ok { + return nil, err + } + + log.Debugf("RPC failure %v %v", st.Message(), st.Code()) + + interval := retryUnavailableInterval + + if st.Code() == codes.ResourceExhausted { + interval = retryExhaustedInterval + if !wasExhausted { + attempt = 0 + wasExhausted = true + } + } else if wasExhausted { + wasExhausted = false + attempt = 0 + } + + delayMS := int(math.Pow(2, float64(attempt))) * interval + delay := time.Duration(delayMS) + + retryAge += delayMS + + time.Sleep(delay * time.Millisecond) + + attempt++ } }