Skip to content

Commit

Permalink
refactor: add DisconnectQueueConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jan 14, 2020
1 parent a2ea46d commit ea42df3
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 60 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 1.0.0-dev

- Add `--disconnect_timeout` option to specify the timeout for graceful shutdown of the disconnect queue. ([@palkan][])

- Add `mem_sys_bytes` metric. ([@palkan][])

Returns the total bytes of memory obtained from the OS
Expand Down
67 changes: 36 additions & 31 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ func init() {

fs.StringVar(&defaults.RPC.Host, "rpc_host", "localhost:50051", "")
fs.StringVar(&headers, "headers", "cookie", "")
fs.IntVar(&defaults.DisconnectRate, "disconnect_rate", 100, "")

fs.IntVar(&defaults.WS.ReadBufferSize, "read_buffer_size", 1024, "")
fs.IntVar(&defaults.WS.WriteBufferSize, "write_buffer_size", 1024, "")
fs.Int64Var(&defaults.WS.MaxMessageSize, "max_message_size", 65536, "")
fs.BoolVar(&defaults.WS.EnableCompression, "enable_ws_compression", false, "")

fs.IntVar(&defaults.DisconnectQueue.Rate, "disconnect_rate", 100, "")
fs.IntVar(&defaults.DisconnectQueue.ShutdownTimeout, "disconnect_timeout", 5, "")

fs.StringVar(&defaults.LogLevel, "log_level", "info", "")
fs.StringVar(&defaults.LogFormat, "log_format", "text", "")
fs.BoolVar(&debugMode, "debug", false, "")
Expand Down Expand Up @@ -108,36 +111,38 @@ USAGE
anycable-go [options]
OPTIONS
--host Server host, default: localhost, env: ANYCABLE_HOST
--port Server port, default: 8080, env: ANYCABLE_PORT, PORT
--path WebSocket endpoint path, default: /cable, env: ANYCABLE_PATH
--health-path HTTP health endpoint path, default: /health, env: ANYCABLE_HEALTH_PATH
--ssl_cert SSL certificate path, env: ANYCABLE_SSL_CERT
--ssl_key SSL private key path, env: ANYCABLE_SSL_KEY
--redis_url Redis url, default: redis://localhost:6379/5, env: ANYCABLE_REDIS_URL, REDIS_URL
--redis_channel Redis channel for broadcasts, default: __anycable__, env: ANYCABLE_REDIS_CHANNEL
--rpc_host RPC service address, default: localhost:50051, env: ANYCABLE_RPC_HOST
--headers List of headers to proxy to RPC, default: cookie, env: ANYCABLE_HEADERS
--disconnect_rate Max number of Disconnect calls per second, default: 100, env: ANYCABLE_DISCONNECT_RATE
--log_level Set logging level (debug/info/warn/error/fatal), default: info, env: ANYCABLE_LOG_LEVEL
--log_format Set logging format (text, json), default: text, env: ANYCABLE_LOG_FORMAT
--debug Enable debug mode (more verbose logging), default: false, env: ANYCABLE_DEBUG
--metrics_log Enable metrics logging (with info level), default: false, env: ANYCABLE_METRICS_LOG
--metrics_log_interval Specify how often flush metrics logs (in seconds), default: 15, env: ANYCABLE_METRICS_LOG_INTERVAL
--metrics_log_formatter Specify the path to custom Ruby formatter script (only supported on MacOS and Linux), default: "" (none), env: ANYCABLE_METRICS_LOG_FORMATTER
--metrics_http Enable HTTP metrics endpoint at the specified path, default: "" (disabled), env: ANYCABLE_METRICS_HTTP
--metrics_host Server host for metrics endpoint, default: the same as for main server, env: ANYCABLE_METRICS_HOST
--metrics_port Server port for metrics endpoint, default: the same as for main server, env: ANYCABLE_METRICS_PORT
--read_buffer_size WebSocket connection read buffer size, default: 1024, env: ANYCABLE_READ_BUFFER_SIZE
--write_buffer_size WebSocket connection write buffer size, default: 1024, env: ANYCABLE_WRITE_BUFFER_SIZE
--max_message_size Maximum size of a message in bytes, default: 65536, env: ANYCABLE_MAX_MESSAGE_SIZE
--enable_ws_compression Enable experimental WebSocket per message compression, default: false, env: ANYCABLE_ENABLE_WS_COMPRESSION
--host Server host, default: localhost, env: ANYCABLE_HOST
--port Server port, default: 8080, env: ANYCABLE_PORT, PORT
--path WebSocket endpoint path, default: /cable, env: ANYCABLE_PATH
--health-path HTTP health endpoint path, default: /health, env: ANYCABLE_HEALTH_PATH
--ssl_cert SSL certificate path, env: ANYCABLE_SSL_CERT
--ssl_key SSL private key path, env: ANYCABLE_SSL_KEY
--redis_url Redis url, default: redis://localhost:6379/5, env: ANYCABLE_REDIS_URL, REDIS_URL
--redis_channel Redis channel for broadcasts, default: __anycable__, env: ANYCABLE_REDIS_CHANNEL
--rpc_host RPC service address, default: localhost:50051, env: ANYCABLE_RPC_HOST
--headers List of headers to proxy to RPC, default: cookie, env: ANYCABLE_HEADERS
--disconnect_rate Max number of Disconnect calls per second, default: 100, env: ANYCABLE_DISCONNECT_RATE
--disconnect_timeout Graceful shutdown timeouts (in seconds), default: 5, env: ANYCABLE_DISCONNECT_TIMEOUT
--log_level Set logging level (debug/info/warn/error/fatal), default: info, env: ANYCABLE_LOG_LEVEL
--log_format Set logging format (text, json), default: text, env: ANYCABLE_LOG_FORMAT
--debug Enable debug mode (more verbose logging), default: false, env: ANYCABLE_DEBUG
--metrics_log Enable metrics logging (with info level), default: false, env: ANYCABLE_METRICS_LOG
--metrics_log_interval Specify how often flush metrics logs (in seconds), default: 15, env: ANYCABLE_METRICS_LOG_INTERVAL
--metrics_log_formatter Specify the path to custom Ruby formatter script (only supported on MacOS and Linux), default: "" (none), env: ANYCABLE_METRICS_LOG_FORMATTER
--metrics_http Enable HTTP metrics endpoint at the specified path, default: "" (disabled), env: ANYCABLE_METRICS_HTTP
--metrics_host Server host for metrics endpoint, default: the same as for main server, env: ANYCABLE_METRICS_HOST
--metrics_port Server port for metrics endpoint, default: the same as for main server, env: ANYCABLE_METRICS_PORT
--read_buffer_size WebSocket connection read buffer size, default: 1024, env: ANYCABLE_READ_BUFFER_SIZE
--write_buffer_size WebSocket connection write buffer size, default: 1024, env: ANYCABLE_WRITE_BUFFER_SIZE
--max_message_size Maximum size of a message in bytes, default: 65536, env: ANYCABLE_MAX_MESSAGE_SIZE
--enable_ws_compression Enable experimental WebSocket per message compression, default: false, env: ANYCABLE_ENABLE_WS_COMPRESSION
-h This help screen
-v Show version
Expand Down
2 changes: 1 addition & 1 deletion cmd/anycable-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {

appNode := node.NewNode(controller, metrics)

disconnector := node.NewDisconnectQueue(appNode, config.DisconnectRate)
disconnector := node.NewDisconnectQueue(appNode, &config.DisconnectQueue)
go disconnector.Run()

appNode.SetDisconnector(disconnector)
Expand Down
2 changes: 1 addition & 1 deletion cmd/gobench-cable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
appNode := node.NewNode(controller, metrics)

// There could be different disconnectors in the future
disconnector := node.NewDisconnectQueue(appNode, config.DisconnectRate)
disconnector := node.NewDisconnectQueue(appNode, &config.DisconnectQueue)
go disconnector.Run()

appNode.SetDisconnector(disconnector)
Expand Down
32 changes: 17 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,28 @@ package config

import (
"github.com/anycable/anycable-go/metrics"
"github.com/anycable/anycable-go/node"
"github.com/anycable/anycable-go/rpc"
"github.com/anycable/anycable-go/server"
)

// Config contains main application configuration
type Config struct {
RPC rpc.Config
RedisURL string
RedisChannel string
Host string
Port int
Path string
HealthPath string
Headers []string
SSL server.SSLConfig
WS server.WSConfig
MaxMessageSize int64
DisconnectRate int
LogLevel string
LogFormat string
Metrics metrics.Config
RPC rpc.Config
RedisURL string
RedisChannel string
Host string
Port int
Path string
HealthPath string
Headers []string
SSL server.SSLConfig
WS server.WSConfig
MaxMessageSize int64
DisconnectQueue node.DisconnectQueueConfig
LogLevel string
LogFormat string
Metrics metrics.Config
}

// New returns a new empty config
Expand All @@ -32,5 +33,6 @@ func New() Config {
config.WS = server.NewWSConfig()
config.Metrics = metrics.NewConfig()
config.RPC = rpc.NewConfig()
config.DisconnectQueue = node.NewDisconnectQueueConfig()
return config
}
31 changes: 21 additions & 10 deletions node/disconnect_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,26 @@ import (
"github.com/apex/log"
)

const (
// How much time wait to call all enqueued calls
// TODO: make configurable
waitTime = 5 * time.Second
)
// DisconnectQueueConfig contains DisconnectQueue configuration
type DisconnectQueueConfig struct {
// Limit the number of Disconnect RPC calls per second
Rate int
// How much time wait to call all enqueued calls at exit (in seconds)
ShutdownTimeout int
}

// NewDisconnectQueueConfig builds a new config
func NewDisconnectQueueConfig() DisconnectQueueConfig {
return DisconnectQueueConfig{ShutdownTimeout: 5, Rate: 100}
}

// DisconnectQueue is a rate-limited executor
type DisconnectQueue struct {
node *Node
// Limit the number of RPC calls per second
// Throttling rate
rate time.Duration
// Graceful shutdown timeout
timeout time.Duration
// Call RPC Disconnect for connections
disconnect chan *Session
// Logger with context
Expand All @@ -32,8 +41,9 @@ type DisconnectQueue struct {
}

// NewDisconnectQueue builds new queue with a specified rate (max calls per second)
func NewDisconnectQueue(node *Node, rate int) *DisconnectQueue {
rateDuration := time.Millisecond * time.Duration(1000/rate)
func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue {
rateDuration := time.Millisecond * time.Duration(1000/config.Rate)
timeout := time.Duration(config.ShutdownTimeout) * time.Second

ctx := log.WithField("context", "disconnector")

Expand All @@ -43,6 +53,7 @@ func NewDisconnectQueue(node *Node, rate int) *DisconnectQueue {
node: node,
disconnect: make(chan *Session, 4096),
rate: rateDuration,
timeout: timeout,
log: ctx,
shutdown: make(chan struct{}, 1),
}
Expand Down Expand Up @@ -82,7 +93,7 @@ func (d *DisconnectQueue) Shutdown() error {
return nil
}

d.log.Infof("Invoking remaining disconnects: %d", left)
d.log.Infof("Invoking remaining disconnects for %s: %d", d.timeout, left)

for {
select {
Expand All @@ -98,7 +109,7 @@ func (d *DisconnectQueue) Shutdown() error {
if left == 0 {
return nil
}
case <-time.After(waitTime):
case <-time.After(d.timeout):
return fmt.Errorf("Had no time to invoke Disconnect calls: %d", len(d.disconnect))
}
}
Expand Down
4 changes: 3 additions & 1 deletion node/disconnect_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ func TestDisconnectQueue_Enqueue(t *testing.T) {

func newQueue() *DisconnectQueue {
node := NewMockNode()
q := NewDisconnectQueue(&node, 1)
config := NewDisconnectQueueConfig()
config.Rate = 1
q := NewDisconnectQueue(&node, &config)

return q
}
4 changes: 3 additions & 1 deletion node/node_mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ func NewMockNode() Node {
log: log.WithField("context", "test"),
}
node.registerMetrics()
node.disconnector = NewDisconnectQueue(&node, 1)
config := NewDisconnectQueueConfig()
config.Rate = 1
node.disconnector = NewDisconnectQueue(&node, &config)
return node
}

Expand Down

0 comments on commit ea42df3

Please sign in to comment.