Skip to content

Commit

Permalink
refactor(rpc): replace pool with single connection implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jan 14, 2020
1 parent ea42df3 commit 1c166e2
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 67 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## 1.0.0-dev

- Use single gRPC client instance instead of a pool. ([@palkan][])

gRPC connection provides concurrency via H2 streams (with load balancing). Using a pool doesn't bring any performance
improvements and sometimes 'cause unstability (e.g., ResourcesExhausted or Unavailable exceptions under the load).

We still limit the number of concurrent RPC requests. Now you can configure it via `--rpc_concurrency` setting.

- Add `--disconnect_timeout` option to specify the timeout for graceful shutdown of the disconnect queue. ([@palkan][])

- Add `mem_sys_bytes` metric. ([@palkan][])
Expand Down
2 changes: 2 additions & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func init() {
fs.StringVar(&defaults.RedisChannel, "redis_channel", "__anycable__", "")

fs.StringVar(&defaults.RPC.Host, "rpc_host", "localhost:50051", "")
fs.IntVar(&defaults.RPC.Concurrency, "rpc_concurrency", 28, "")
fs.StringVar(&headers, "headers", "cookie", "")

fs.IntVar(&defaults.WS.ReadBufferSize, "read_buffer_size", 1024, "")
Expand Down Expand Up @@ -123,6 +124,7 @@ OPTIONS
--redis_channel Redis channel for broadcasts, default: __anycable__, env: ANYCABLE_REDIS_CHANNEL
--rpc_host RPC service address, default: localhost:50051, env: ANYCABLE_RPC_HOST
--rpc_concurrency Max number of concurrent RPC request; should be slightly less than the RPC server concurrency, default: 28, env: ANYCABLE_RPC_CONCURRENCY
--headers List of headers to proxy to RPC, default: cookie, env: ANYCABLE_HEADERS
--disconnect_rate Max number of Disconnect calls per second, default: 100, env: ANYCABLE_DISCONNECT_RATE
Expand Down
7 changes: 6 additions & 1 deletion rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package rpc

// Config contains RPC controller configuration
type Config struct {
// RPC instance host
Host string
// The max number of simulteneous requests.
// Should be slightly less than the RPC server concurrency to avoid
// ResourceExhausted errors
Concurrency int
}

// NewConfig builds a new config
func NewConfig() Config {
return Config{}
return Config{Concurrency: 28}
}
148 changes: 82 additions & 66 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/anycable/anycable-go/common"
"github.com/anycable/anycable-go/metrics"
"github.com/apex/log"

grpcpool "github.com/anycable/anycable-go/pool"
pb "github.com/anycable/anycable-go/protos"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

const (
retryInterval = 500
invokeTimeout = 3000

initialCapacity = 5
maxCapacity = 50
retryExhaustedInterval = 10
retryUnavailableInterval = 100

metricsRPCCalls = "rpc_call_total"
metricsRPCFailures = "rpc_error_total"
Expand All @@ -30,7 +32,8 @@ const (
// Controller implements node.Controller interface for gRPC
type Controller struct {
config *Config
pool grpcpool.Pool
sem chan (struct{})
conn *grpc.ClientConn
metrics *metrics.Metrics
log *log.Entry
}
Expand All @@ -46,38 +49,50 @@ func NewController(metrics *metrics.Metrics, config *Config) *Controller {
// Start initializes RPC connection pool
func (c *Controller) Start() error {
host := c.config.Host
capacity := c.config.Concurrency

factory := func() (*grpc.ClientConn, error) {
return grpc.Dial(host, grpc.WithInsecure())
kacp := keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
PermitWithoutStream: true, // send pings even without active streams
}

pool, err := grpcpool.NewChannelPool(initialCapacity, maxCapacity, factory)
conn, err := grpc.Dial(
host,
grpc.WithInsecure(),
grpc.WithKeepaliveParams(kacp),
grpc.WithBalancerName("round_robin"),
)

c.sem = make(chan struct{}, capacity)
for i := 0; i < capacity; i++ {
c.sem <- struct{}{}
}

if err == nil {
c.log.Infof("RPC pool initialized: %s", host)
c.log.Infof("RPC controller initialized: %s (capacity: %d)", host, capacity)
}

c.pool = pool
c.conn = conn
return err
}

// Shutdown closes connections
func (c *Controller) Shutdown() error {
if c.pool == nil {
if c.conn == nil {
return nil
}

c.pool.Close()
defer c.conn.Close()

busy := c.pool.Busy()
busy := c.busy()

if busy > 0 {
c.log.Infof("Waiting for active RPC calls to finish: %d", busy)
}

// Wait for active connections
_, err := retry(func() (interface{}, error) {
busy := c.pool.Busy()
busy := c.busy()

if busy > 0 {
return false, fmt.Errorf("There are %d active RPC connections left", busy)
Expand All @@ -92,15 +107,10 @@ func (c *Controller) Shutdown() error {

// Authenticate performs Connect RPC call
func (c *Controller) Authenticate(sid string, path string, headers *map[string]string) (string, []string, error) {
conn, err := c.getConn()

if err != nil {
return "", nil, err
}
<-c.sem
defer func() { c.sem <- struct{}{} }()

defer conn.Close()

client := pb.NewRPCClient(conn.Conn)
client := pb.NewRPCClient(c.conn)

op := func() (interface{}, error) {
return client.Connect(newContext(sid), &pb.ConnectionRequest{Path: path, Headers: *headers})
Expand Down Expand Up @@ -134,15 +144,10 @@ func (c *Controller) Authenticate(sid string, path string, headers *map[string]s

// Subscribe performs Command RPC call with "subscribe" command
func (c *Controller) Subscribe(sid string, id string, channel string) (*common.CommandResult, error) {
conn, err := c.getConn()

if err != nil {
return nil, err
}
<-c.sem
defer func() { c.sem <- struct{}{} }()

defer conn.Close()

client := pb.NewRPCClient(conn.Conn)
client := pb.NewRPCClient(c.conn)

op := func() (interface{}, error) {
return client.Command(newContext(sid), &pb.CommandMessage{Command: "subscribe", Identifier: channel, ConnectionIdentifiers: id})
Expand All @@ -155,15 +160,10 @@ func (c *Controller) Subscribe(sid string, id string, channel string) (*common.C

// Unsubscribe performs Command RPC call with "unsubscribe" command
func (c *Controller) Unsubscribe(sid string, id string, channel string) (*common.CommandResult, error) {
conn, err := c.getConn()

if err != nil {
return nil, err
}
<-c.sem
defer func() { c.sem <- struct{}{} }()

defer conn.Close()

client := pb.NewRPCClient(conn.Conn)
client := pb.NewRPCClient(c.conn)

op := func() (interface{}, error) {
return client.Command(newContext(sid), &pb.CommandMessage{Command: "unsubscribe", Identifier: channel, ConnectionIdentifiers: id})
Expand All @@ -176,15 +176,10 @@ func (c *Controller) Unsubscribe(sid string, id string, channel string) (*common

// Perform performs Command RPC call with "perform" command
func (c *Controller) Perform(sid string, id string, channel string, data string) (*common.CommandResult, error) {
conn, err := c.getConn()

if err != nil {
return nil, err
}
<-c.sem
defer func() { c.sem <- struct{}{} }()

defer conn.Close()

client := pb.NewRPCClient(conn.Conn)
client := pb.NewRPCClient(c.conn)

op := func() (interface{}, error) {
return client.Command(newContext(sid), &pb.CommandMessage{Command: "message", Identifier: channel, ConnectionIdentifiers: id, Data: data})
Expand All @@ -197,15 +192,10 @@ func (c *Controller) Perform(sid string, id string, channel string, data string)

// Disconnect performs disconnect RPC call
func (c *Controller) Disconnect(sid string, id string, subscriptions []string, path string, headers *map[string]string) error {
conn, err := c.getConn()

if err != nil {
return err
}

defer conn.Close()
<-c.sem
defer func() { c.sem <- struct{}{} }()

client := pb.NewRPCClient(conn.Conn)
client := pb.NewRPCClient(c.conn)

op := func() (interface{}, error) {
return client.Disconnect(newContext(sid), &pb.DisconnectRequest{Identifiers: id, Subscriptions: subscriptions, Path: path, Headers: *headers})
Expand Down Expand Up @@ -266,31 +256,57 @@ func (c *Controller) parseCommandResponse(response interface{}, err error) (*com
return nil, errors.New("Failed to deserialize command response")
}

func (c *Controller) getConn() (*grpcpool.Conn, error) {
conn, err := c.pool.Get()

if err != nil {
return nil, err
}

return &conn, nil
func (c *Controller) busy() int {
// The number of in-flight request is the
// the number of initial capacity "tickets" (concurrency)
// minus the size of the semaphore channel
return c.config.Concurrency - len(c.sem)
}

func retry(callback func() (interface{}, error)) (res interface{}, err error) {
attempts := invokeTimeout / retryInterval
retryAge := 0
attempt := 0
wasExhausted := false

for i := 0; ; i++ {
for {
res, err = callback()

if err == nil {
return res, nil
}

if i >= (attempts - 1) {
if retryAge > invokeTimeout {
return nil, err
}

time.Sleep(retryInterval * time.Millisecond)
st, ok := status.FromError(err)
if !ok {
return nil, err
}

log.Debugf("RPC failure %v %v", st.Message(), st.Code())

interval := retryUnavailableInterval

if st.Code() == codes.ResourceExhausted {
interval = retryExhaustedInterval
if !wasExhausted {
attempt = 0
wasExhausted = true
}
} else if wasExhausted {
wasExhausted = false
attempt = 0
}

delayMS := int(math.Pow(2, float64(attempt))) * interval
delay := time.Duration(delayMS)

retryAge += delayMS

time.Sleep(delay * time.Millisecond)

attempt++
}
}

Expand Down

0 comments on commit 1c166e2

Please sign in to comment.