diff --git a/codecov.yml b/codecov.yml index da01f6f7c..40040d8b7 100644 --- a/codecov.yml +++ b/codecov.yml @@ -16,6 +16,7 @@ ignore: - "tests" - "plugins/metrics/config_test.go" - "plugins/websockets/storage/storage_test.go" + - "plugins/websockets/config.go" - "pkg/bst/bst_test.go" - "pkg/doc" - "pkg/pool/static_pool_test.go" diff --git a/go.mod b/go.mod index 0dbcc2a95..2ac9684ca 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( // SPIRAL ==== github.com/spiral/endure v1.0.2 github.com/spiral/errors v1.0.11 - github.com/spiral/goridge/v3 v3.1.3 + github.com/spiral/goridge/v3 v3.1.4 // =========== github.com/stretchr/testify v1.7.0 github.com/tklauser/go-sysconf v0.3.6 // indirect diff --git a/go.sum b/go.sum index 561b12303..f218097f5 100644 --- a/go.sum +++ b/go.sum @@ -388,8 +388,8 @@ github.com/spiral/endure v1.0.2/go.mod h1:/mnduq57eBKgKCwpuLgUp8Fn/c3h6JgWybG+0h github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go= github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/goridge/v3 v3.1.3 h1:mjKLxWBmCN2ZErkA1mTkQAMbYjWQK8cb7DluQA/9wjU= -github.com/spiral/goridge/v3 v3.1.3/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk= +github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40= +github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index 4926cad6d..d021dbbe0 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -44,3 +44,7 @@ type Publisher interface { type Reader interface { Next() (*websocketsv1.Message, error) } + +type PSProvider interface { + PSProvide(key string) (PubSub, error) +} diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 9b4cf9f79..28e2a89cc 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -46,7 +46,7 @@ func (s *Plugin) Stop() error { return nil } -func (s *Plugin) Provide(key string) (kv.Storage, error) { +func (s *Plugin) KVProvide(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 3997e0d49..936b20477 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -34,7 +34,7 @@ func (s *Plugin) Name() string { // Available interface implementation func (s *Plugin) Available() {} -func (s *Plugin) Provide(key string) (kv.Storage, error) { +func (s *Plugin) KVProvide(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 2be7caaec..da81017e1 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -45,7 +45,7 @@ func (s *Plugin) Stop() error { return nil } -func (s *Plugin) Provide(key string) (kv.Storage, error) { +func (s *Plugin) KVProvide(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go deleted file mode 100644 index 413482361..000000000 --- a/plugins/kv/drivers/redis/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package redis - -import "time" - -type Config struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - -// InitDefaults initializing fill config with default values -func (s *Config) InitDefaults() { - if s.Addrs == nil { - s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage - } -} diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go deleted file mode 100644 index 3694c5a72..000000000 --- a/plugins/kv/drivers/redis/plugin.go +++ /dev/null @@ -1,51 +0,0 @@ -package redis - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const PluginName = "redis" - -// Plugin BoltDB K/V storage. -type Plugin struct { - cfgPlugin config.Configurer - // logger - log logger.Logger -} - -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { - return errors.E(errors.Disabled) - } - - s.log = log - s.cfgPlugin = cfg - return nil -} - -// Serve is noop here -func (s *Plugin) Serve() chan error { - return make(chan error) -} - -func (s *Plugin) Stop() error { - return nil -} - -func (s *Plugin) Provide(key string) (kv.Storage, error) { - const op = errors.Op("redis_plugin_provide") - st, err := NewRedisDriver(s.log, key, s.cfgPlugin) - if err != nil { - return nil, errors.E(op, err) - } - - return st, nil -} - -// Name returns plugin name -func (s *Plugin) Name() string { - return PluginName -} diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 744c6b51d..5aedd5c3c 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -37,5 +37,5 @@ type StorageDriver interface { // Provider provides storage based on the config type Provider interface { // Provide provides Storage based on the config key - Provide(key string) (Storage, error) + KVProvide(key string) (Storage, error) } diff --git a/plugins/kv/storage.go b/plugins/kv/plugin.go similarity index 83% rename from plugins/kv/storage.go rename to plugins/kv/plugin.go index 9a609735f..efe922529 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/plugin.go @@ -50,7 +50,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { //nolint:gocognit errCh := make(chan error, 1) const op = errors.Op("kv_plugin_serve") // key - storage name in the config @@ -100,7 +100,7 @@ func (p *Plugin) Serve() chan error { continue } - storage, err := p.drivers[memcached].Provide(configKey) + storage, err := p.drivers[memcached].KVProvide(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -115,7 +115,7 @@ func (p *Plugin) Serve() chan error { continue } - storage, err := p.drivers[boltdb].Provide(configKey) + storage, err := p.drivers[boltdb].KVProvide(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -129,7 +129,7 @@ func (p *Plugin) Serve() chan error { continue } - storage, err := p.drivers[memory].Provide(configKey) + storage, err := p.drivers[memory].KVProvide(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -143,15 +143,33 @@ func (p *Plugin) Serve() chan error { continue } - storage, err := p.drivers[redis].Provide(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + storage, err := p.drivers[redis].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + case p.cfgPlugin.Has(redis): + storage, err := p.drivers[redis].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + continue + default: + // otherwise - error, no local or global config + p.log.Warn("no global or local redis configuration provided", "key", configKey) + continue } - // save the storage - p.storages[k] = storage - default: p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) } diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go new file mode 100644 index 000000000..650f0b4b9 --- /dev/null +++ b/plugins/memory/plugin.go @@ -0,0 +1,25 @@ +package memory + +import ( + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName string = "memory" + +type Plugin struct { + log logger.Logger +} + +func (p *Plugin) Init(log logger.Logger) error { + p.log = log + return nil +} + +func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { + return NewPubSubDriver(p.log, key) +} + +func (p *Plugin) Name() string { + return PluginName +} diff --git a/plugins/websockets/memory/inMemory.go b/plugins/memory/pubsub.go similarity index 71% rename from plugins/websockets/memory/inMemory.go rename to plugins/memory/pubsub.go index cef281827..75cd9d245 100644 --- a/plugins/websockets/memory/inMemory.go +++ b/plugins/memory/pubsub.go @@ -10,36 +10,36 @@ import ( "google.golang.org/protobuf/proto" ) -type Plugin struct { +type PubSubDriver struct { sync.RWMutex - log logger.Logger - // channel with the messages from the RPC pushCh chan []byte // user-subscribed topics storage bst.Storage + log logger.Logger } -func NewInMemory(log logger.Logger) pubsub.PubSub { - return &Plugin{ - log: log, +func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) { + ps := &PubSubDriver{ pushCh: make(chan []byte, 10), storage: bst.NewBST(), + log: log, } + return ps, nil } -func (p *Plugin) Publish(message []byte) error { +func (p *PubSubDriver) Publish(message []byte) error { p.pushCh <- message return nil } -func (p *Plugin) PublishAsync(message []byte) { +func (p *PubSubDriver) PublishAsync(message []byte) { go func() { p.pushCh <- message }() } -func (p *Plugin) Subscribe(connectionID string, topics ...string) error { +func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { p.Lock() defer p.Unlock() for i := 0; i < len(topics); i++ { @@ -48,7 +48,7 @@ func (p *Plugin) Subscribe(connectionID string, topics ...string) error { return nil } -func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { +func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { p.Lock() defer p.Unlock() for i := 0; i < len(topics); i++ { @@ -57,7 +57,7 @@ func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { return nil } -func (p *Plugin) Connections(topic string, res map[string]struct{}) { +func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { p.RLock() defer p.RUnlock() @@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } } -func (p *Plugin) Next() (*websocketsv1.Message, error) { +func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go new file mode 100644 index 000000000..d0a184d2f --- /dev/null +++ b/plugins/redis/clients.go @@ -0,0 +1,84 @@ +package redis + +import ( + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" +) + +// RedisClient return a client based on the provided section key +// key sample: kv.some-section.redis +// kv.redis +// redis (root) +func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) { + const op = errors.Op("redis_get_client") + + if !p.cfgPlugin.Has(key) { + return nil, errors.E(op, errors.Errorf("no such section: %s", key)) + } + + cfg := &Config{} + + err := p.cfgPlugin.UnmarshalKey(key, cfg) + if err != nil { + return nil, errors.E(op, err) + } + + cfg.InitDefaults() + + uc := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: cfg.Addrs, + DB: cfg.DB, + Username: cfg.Username, + Password: cfg.Password, + SentinelPassword: cfg.SentinelPassword, + MaxRetries: cfg.MaxRetries, + MinRetryBackoff: cfg.MaxRetryBackoff, + MaxRetryBackoff: cfg.MaxRetryBackoff, + DialTimeout: cfg.DialTimeout, + ReadTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + PoolSize: cfg.PoolSize, + MinIdleConns: cfg.MinIdleConns, + MaxConnAge: cfg.MaxConnAge, + PoolTimeout: cfg.PoolTimeout, + IdleTimeout: cfg.IdleTimeout, + IdleCheckFrequency: cfg.IdleCheckFreq, + ReadOnly: cfg.ReadOnly, + RouteByLatency: cfg.RouteByLatency, + RouteRandomly: cfg.RouteRandomly, + MasterName: cfg.MasterName, + }) + + return uc, nil +} + +func (p *Plugin) DefaultClient() redis.UniversalClient { + cfg := &Config{} + cfg.InitDefaults() + + uc := redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: cfg.Addrs, + DB: cfg.DB, + Username: cfg.Username, + Password: cfg.Password, + SentinelPassword: cfg.SentinelPassword, + MaxRetries: cfg.MaxRetries, + MinRetryBackoff: cfg.MaxRetryBackoff, + MaxRetryBackoff: cfg.MaxRetryBackoff, + DialTimeout: cfg.DialTimeout, + ReadTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + PoolSize: cfg.PoolSize, + MinIdleConns: cfg.MinIdleConns, + MaxConnAge: cfg.MaxConnAge, + PoolTimeout: cfg.PoolTimeout, + IdleTimeout: cfg.IdleTimeout, + IdleCheckFrequency: cfg.IdleCheckFreq, + ReadOnly: cfg.ReadOnly, + RouteByLatency: cfg.RouteByLatency, + RouteRandomly: cfg.RouteRandomly, + MasterName: cfg.MasterName, + }) + + return uc +} diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go index c0be61377..189b00022 100644 --- a/plugins/redis/interface.go +++ b/plugins/redis/interface.go @@ -4,6 +4,9 @@ import "github.com/go-redis/redis/v8" // Redis in the redis KV plugin interface type Redis interface { - // GetClient provides universal redis client - GetClient() redis.UniversalClient + // RedisClient provides universal redis client + RedisClient(key string) (redis.UniversalClient, error) + + // DefaultClient provide default redis client based on redis defaults + DefaultClient() redis.UniversalClient } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/redis/kv.go similarity index 100% rename from plugins/kv/drivers/redis/driver.go rename to plugins/redis/kv.go diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 47ffeb397..24c21b558 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -1,15 +1,14 @@ package redis import ( - "context" "sync" "github.com/go-redis/redis/v8" "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" ) const PluginName = "redis" @@ -17,80 +16,37 @@ const PluginName = "redis" type Plugin struct { sync.RWMutex // config for RR integration - cfg *Config + cfgPlugin config.Configurer // logger log logger.Logger // redis universal client universalClient redis.UniversalClient // fanIn implementation used to deliver messages from all channels to the single websocket point - fanin *FanIn -} - -func (p *Plugin) GetClient() redis.UniversalClient { - return p.universalClient + stopCh chan struct{} } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("redis_plugin_init") - - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, errors.Disabled, err) - } - - p.cfg.InitDefaults() p.log = log - - p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: p.cfg.Addrs, - DB: p.cfg.DB, - Username: p.cfg.Username, - Password: p.cfg.Password, - SentinelPassword: p.cfg.SentinelPassword, - MaxRetries: p.cfg.MaxRetries, - MinRetryBackoff: p.cfg.MaxRetryBackoff, - MaxRetryBackoff: p.cfg.MaxRetryBackoff, - DialTimeout: p.cfg.DialTimeout, - ReadTimeout: p.cfg.ReadTimeout, - WriteTimeout: p.cfg.WriteTimeout, - PoolSize: p.cfg.PoolSize, - MinIdleConns: p.cfg.MinIdleConns, - MaxConnAge: p.cfg.MaxConnAge, - PoolTimeout: p.cfg.PoolTimeout, - IdleTimeout: p.cfg.IdleTimeout, - IdleCheckFrequency: p.cfg.IdleCheckFreq, - ReadOnly: p.cfg.ReadOnly, - RouteByLatency: p.cfg.RouteByLatency, - RouteRandomly: p.cfg.RouteRandomly, - MasterName: p.cfg.MasterName, - }) - - // init fanin - p.fanin = newFanIn(p.universalClient, log) + p.cfgPlugin = cfg + p.stopCh = make(chan struct{}, 1) return nil } func (p *Plugin) Serve() chan error { - errCh := make(chan error) - return errCh + return make(chan error) } func (p *Plugin) Stop() error { const op = errors.Op("redis_plugin_stop") - err := p.fanin.stop() - if err != nil { - return errors.E(op, err) - } + p.stopCh <- struct{}{} - err = p.universalClient.Close() - if err != nil { - return errors.E(op, err) + if p.universalClient != nil { + err := p.universalClient.Close() + if err != nil { + return errors.E(op, err) + } } return nil @@ -103,112 +59,17 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []byte) error { - p.Lock() - defer p.Unlock() - - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return errors.E(err) - } - - for j := 0; j < len(m.GetTopics()); j++ { - f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) - if f.Err() != nil { - return f.Err() - } - } - return nil -} - -func (p *Plugin) PublishAsync(msg []byte) { - go func() { - p.Lock() - defer p.Unlock() - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - p.log.Error("message unmarshal error") - return - } - - for j := 0; j < len(m.GetTopics()); j++ { - f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) - if f.Err() != nil { - p.log.Error("redis publish", "error", f.Err()) - } - } - }() -} - -func (p *Plugin) Subscribe(connectionID string, topics ...string) error { - // just add a connection - for i := 0; i < len(topics); i++ { - // key - topic - // value - connectionID - hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID) - res, err := hset.Result() - if err != nil { - return err - } - if res == 0 { - p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) - continue - } - } - - // and subscribe after - return p.fanin.sub(topics...) -} - -func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { - // Remove topics from the storage - for i := 0; i < len(topics); i++ { - srem := p.universalClient.SRem(context.Background(), topics[i], connectionID) - if srem.Err() != nil { - return srem.Err() - } - } - - for i := 0; i < len(topics); i++ { - // if there are no such topics, we can safely unsubscribe from the redis - exists := p.universalClient.Exists(context.Background(), topics[i]) - res, err := exists.Result() - if err != nil { - return err - } - - // if we have associated connections - skip - if res == 1 { // exists means that topic still exists and some other nodes may have connections associated with it - continue - } - - // else - unsubscribe - err = p.fanin.unsub(topics[i]) - if err != nil { - return err - } - } - - return nil -} - -func (p *Plugin) Connections(topic string, res map[string]struct{}) { - hget := p.universalClient.SMembersMap(context.Background(), topic) - r, err := hget.Result() +// KVProvide provides KV storage implementation over the redis plugin +func (p *Plugin) KVProvide(key string) (kv.Storage, error) { + const op = errors.Op("redis_plugin_provide") + st, err := NewRedisDriver(p.log, key, p.cfgPlugin) if err != nil { - panic(err) + return nil, errors.E(op, err) } - // assighn connections - // res expected to be from the sync.Pool - for k := range r { - res[k] = struct{}{} - } + return st, nil } -// Next return next message -func (p *Plugin) Next() (*websocketsv1.Message, error) { - return <-p.fanin.consume(), nil +func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { + return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go new file mode 100644 index 000000000..dbda7ea4a --- /dev/null +++ b/plugins/redis/pubsub.go @@ -0,0 +1,189 @@ +package redis + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" +) + +type PubSubDriver struct { + sync.RWMutex + cfg *Config `mapstructure:"redis"` + + log logger.Logger + fanin *FanIn + universalClient redis.UniversalClient + stopCh chan struct{} +} + +func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stopCh chan struct{}) (pubsub.PubSub, error) { + const op = errors.Op("new_pub_sub_driver") + ps := &PubSubDriver{ + log: log, + stopCh: stopCh, + } + + // will be different for every connected driver + err := cfgPlugin.UnmarshalKey(key, &ps.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + ps.cfg.InitDefaults() + + ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: ps.cfg.Addrs, + DB: ps.cfg.DB, + Username: ps.cfg.Username, + Password: ps.cfg.Password, + SentinelPassword: ps.cfg.SentinelPassword, + MaxRetries: ps.cfg.MaxRetries, + MinRetryBackoff: ps.cfg.MaxRetryBackoff, + MaxRetryBackoff: ps.cfg.MaxRetryBackoff, + DialTimeout: ps.cfg.DialTimeout, + ReadTimeout: ps.cfg.ReadTimeout, + WriteTimeout: ps.cfg.WriteTimeout, + PoolSize: ps.cfg.PoolSize, + MinIdleConns: ps.cfg.MinIdleConns, + MaxConnAge: ps.cfg.MaxConnAge, + PoolTimeout: ps.cfg.PoolTimeout, + IdleTimeout: ps.cfg.IdleTimeout, + IdleCheckFrequency: ps.cfg.IdleCheckFreq, + ReadOnly: ps.cfg.ReadOnly, + RouteByLatency: ps.cfg.RouteByLatency, + RouteRandomly: ps.cfg.RouteRandomly, + MasterName: ps.cfg.MasterName, + }) + + ps.fanin = newFanIn(ps.universalClient, log) + + ps.stop() + + return ps, nil +} + +func (p *PubSubDriver) stop() { + go func() { + for range p.stopCh { + _ = p.fanin.stop() + return + } + }() +} + +func (p *PubSubDriver) Publish(msg []byte) error { + p.Lock() + defer p.Unlock() + + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return errors.E(err) + } + + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) + if f.Err() != nil { + return f.Err() + } + } + return nil +} + +func (p *PubSubDriver) PublishAsync(msg []byte) { + go func() { + p.Lock() + defer p.Unlock() + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + p.log.Error("message unmarshal error") + return + } + + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) + if f.Err() != nil { + p.log.Error("redis publish", "error", f.Err()) + } + } + }() +} + +func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { + // just add a connection + for i := 0; i < len(topics); i++ { + // key - topic + // value - connectionID + hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID) + res, err := hset.Result() + if err != nil { + return err + } + if res == 0 { + p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) + continue + } + } + + // and subscribe after + return p.fanin.sub(topics...) +} + +func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { + // Remove topics from the storage + for i := 0; i < len(topics); i++ { + srem := p.universalClient.SRem(context.Background(), topics[i], connectionID) + if srem.Err() != nil { + return srem.Err() + } + } + + for i := 0; i < len(topics); i++ { + // if there are no such topics, we can safely unsubscribe from the redis + exists := p.universalClient.Exists(context.Background(), topics[i]) + res, err := exists.Result() + if err != nil { + return err + } + + // if we have associated connections - skip + if res == 1 { // exists means that topic still exists and some other nodes may have connections associated with it + continue + } + + // else - unsubscribe + err = p.fanin.unsub(topics[i]) + if err != nil { + return err + } + } + + return nil +} + +func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { + hget := p.universalClient.SMembersMap(context.Background(), topic) + r, err := hget.Result() + if err != nil { + panic(err) + } + + // assighn connections + // res expected to be from the sync.Pool + for k := range r { + res[k] = struct{}{} + } +} + +// Next return next message +func (p *PubSubDriver) Next() (*websocketsv1.Message, error) { + return <-p.fanin.consume(), nil +} diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go index be4aaa828..93d9ac3b3 100644 --- a/plugins/websockets/config.go +++ b/plugins/websockets/config.go @@ -7,14 +7,48 @@ import ( ) /* +# GLOBAL +redis: + addrs: + - 'localhost:6379' + websockets: # pubsubs should implement PubSub interface to be collected via endure.Collects pubsubs:["redis", "amqp", "memory"] + # OR local + redis: + addrs: + - 'localhost:6379' + # path used as websockets path path: "/ws" */ +type RedisConfig struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + // Config represents configuration for the ws plugin type Config struct { // http path for the websocket @@ -23,6 +57,8 @@ type Config struct { PubSubs []string `mapstructure:"pubsubs"` Middleware []string `mapstructure:"middleware"` + Redis *RedisConfig `mapstructure:"redis"` + Pool *pool.Config `mapstructure:"pool"` } @@ -55,4 +91,11 @@ func (c *Config) InitDefault() { } c.Pool.Supervisor.InitDefaults() } + + if c.Redis != nil { + if c.Redis.Addrs == nil { + // append default + c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379") + } + } } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 6ddd609c0..6dfe6ca38 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -2,6 +2,7 @@ package websockets import ( "context" + "fmt" "net/http" "sync" "time" @@ -23,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" - "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "google.golang.org/protobuf/proto" @@ -38,8 +38,11 @@ type Plugin struct { // Collection with all available pubsubs pubsubs map[string]pubsub.PubSub - cfg *Config - log logger.Logger + psProviders map[string]pubsub.PSProvider + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger // global connections map connections sync.Map @@ -69,9 +72,12 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } p.cfg.InitDefault() - p.pubsubs = make(map[string]pubsub.PubSub) + p.psProviders = make(map[string]pubsub.PSProvider) + p.log = log + p.cfgPlugin = cfg + p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, ReadBufferSize: 1024, @@ -80,14 +86,18 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server - // attach default driver - p.pubsubs["memory"] = memory.NewInMemory(p.log) - return nil } func (p *Plugin) Serve() chan error { - errCh := make(chan error) + errCh := make(chan error, 1) + const op = errors.Op("websockets_plugin_serve") + + err := p.initPubSubs() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } go func() { var err error @@ -133,6 +143,54 @@ func (p *Plugin) Serve() chan error { return errCh } +func (p *Plugin) initPubSubs() error { + for i := 0; i < len(p.cfg.PubSubs); i++ { + // don't need to have a section for the in-memory + if p.cfg.PubSubs[i] == "memory" { + if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { + r, err := provider.PSProvide("") + if err != nil { + return err + } + + // append default in-memory provider + p.pubsubs["memory"] = r + } + continue + } + // key - memory, redis + if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { + // try local key + switch { + // try local config first + case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): + r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) + if err != nil { + return err + } + + // append redis provider + p.pubsubs[p.cfg.PubSubs[i]] = r + case p.cfgPlugin.Has(p.cfg.PubSubs[i]): + r, err := provider.PSProvide(p.cfg.PubSubs[i]) + if err != nil { + return err + } + + // append redis provider + p.pubsubs[p.cfg.PubSubs[i]] = r + default: + return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) + } + } else { + // no such driver + p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) + } + } + + return nil +} + func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() @@ -167,8 +225,8 @@ func (p *Plugin) Name() string { } // GetPublishers collects all pubsubs -func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) { - p.pubsubs[name.Name()] = pub +func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) { + p.psProviders[name.Name()] = pub } func (p *Plugin) Middleware(next http.Handler) http.Handler { @@ -389,7 +447,6 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid } } -// go:inline func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) { const op = errors.Op("exec") pd := payload.Payload{ diff --git a/tests/plugins/kv/configs/.rr-redis-global.yaml b/tests/plugins/kv/configs/.rr-redis-global.yaml new file mode 100644 index 000000000..d2e8aefe6 --- /dev/null +++ b/tests/plugins/kv/configs/.rr-redis-global.yaml @@ -0,0 +1,14 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +logs: + mode: development + level: error + +redis-rr: + addrs: + - 'localhost:6379' + +kv: + redis-rr: + driver: redis diff --git a/tests/plugins/kv/configs/.rr-redis-no-config.yaml b/tests/plugins/kv/configs/.rr-redis-no-config.yaml new file mode 100644 index 000000000..9cf063741 --- /dev/null +++ b/tests/plugins/kv/configs/.rr-redis-no-config.yaml @@ -0,0 +1,10 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +logs: + mode: development + level: error + +kv: + redis-rr: + driver: redis diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index fd8a58cf8..e7e7735aa 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -18,8 +18,8 @@ import ( "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/stretchr/testify/assert" ) @@ -158,6 +158,7 @@ func TestBoltDb(t *testing.T) { &boltdb.Plugin{}, &rpcPlugin.Plugin{}, &logger.ZapLogger{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -373,6 +374,7 @@ func TestMemcached(t *testing.T) { &memcached.Plugin{}, &rpcPlugin.Plugin{}, &logger.ZapLogger{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -801,6 +803,143 @@ func TestRedis(t *testing.T) { &redis.Plugin{}, &rpcPlugin.Plugin{}, &logger.ZapLogger{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("REDIS", testRPCMethodsRedis) + stopCh <- struct{}{} + wg.Wait() +} + +func TestRedisGlobalSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-redis-global.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &kv.Plugin{}, + &redis.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("REDIS", testRPCMethodsRedis) + stopCh <- struct{}{} + wg.Wait() +} + +func TestRedisNoConfig(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-redis-no-config.yaml", // should be used default + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &kv.Plugin{}, + &redis.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &memory.Plugin{}, ) assert.NoError(t, err) diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go index e50213e5b..68da1394e 100644 --- a/tests/plugins/redis/plugin1.go +++ b/tests/plugins/redis/plugin1.go @@ -14,8 +14,10 @@ type Plugin1 struct { } func (p *Plugin1) Init(redis redisPlugin.Redis) error { - p.redisClient = redis.GetClient() - return nil + var err error + p.redisClient, err = redis.RedisClient("redis") + + return err } func (p *Plugin1) Serve() chan error { diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml new file mode 100644 index 000000000..27eab5579 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-memory-local.yaml @@ -0,0 +1,38 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:13235 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + +websockets: + pubsubs: [ "redis", "memory" ] + redis: + addrs: + - "localhost:6379" + + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml new file mode 100644 index 000000000..fd1257947 --- /dev/null +++ b/tests/plugins/websockets/configs/.rr-websockets-redis-no-section.yaml @@ -0,0 +1,34 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:13235 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + + +websockets: + pubsubs: [ "redis", "memory" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 3ef144eb8..07ee5f122 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -20,6 +20,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -45,6 +46,7 @@ func TestBroadcastInit(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -153,6 +155,7 @@ func TestWSRedisAndMemory(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -211,6 +214,112 @@ func TestWSRedisAndMemory(t *testing.T) { wg.Wait() } +func TestWSRedisAndMemoryGlobal(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-redis-memory.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) + t.Run("RPCWsMemory", RPCWsMemory) + t.Run("RPCWsRedis", RPCWsRedis) + + stopCh <- struct{}{} + + wg.Wait() +} + +func TestWSRedisNoSection(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-websockets-redis-no-section.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + assert.Error(t, err) +} + func RPCWsMemoryPubAsync(t *testing.T) { da := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, @@ -446,6 +555,7 @@ func TestWSMemoryDeny(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -573,6 +683,7 @@ func TestWSMemoryStop(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -665,6 +776,7 @@ func TestWSMemoryOk(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -853,6 +965,7 @@ func publish2(t *testing.T, command string, broker string, topics ...string) { assert.NoError(t, err) assert.True(t, ret.Ok) } + func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { return &websocketsv1.Message{ Topics: topics,