Skip to content

Commit

Permalink
#723 fix(redis, ws): redis driver not available
Browse files Browse the repository at this point in the history
#723 fix(redis, ws): redis driver not available
  • Loading branch information
rustatian authored Jun 14, 2021
2 parents dc8ed20 + 9748651 commit a38a4e6
Show file tree
Hide file tree
Showing 27 changed files with 841 additions and 291 deletions.
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ignore:
- "tests"
- "plugins/metrics/config_test.go"
- "plugins/websockets/storage/storage_test.go"
- "plugins/websockets/config.go"
- "pkg/bst/bst_test.go"
- "pkg/doc"
- "pkg/pool/static_pool_test.go"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
// SPIRAL ====
github.com/spiral/endure v1.0.2
github.com/spiral/errors v1.0.11
github.com/spiral/goridge/v3 v3.1.3
github.com/spiral/goridge/v3 v3.1.4
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ github.com/spiral/endure v1.0.2/go.mod h1:/mnduq57eBKgKCwpuLgUp8Fn/c3h6JgWybG+0h
github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go=
github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v3 v3.1.3 h1:mjKLxWBmCN2ZErkA1mTkQAMbYjWQK8cb7DluQA/9wjU=
github.com/spiral/goridge/v3 v3.1.3/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
github.com/spiral/goridge/v3 v3.1.4 h1:5egVVTfaD1PO4MRgzU0yyog86pAh+JIOk7xhe7BtG40=
github.com/spiral/goridge/v3 v3.1.4/go.mod h1:swcWZW7nP+KU9rgyRf6w7CfNDCWRC/vePE2+AKtoqjk=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
Expand Down
4 changes: 4 additions & 0 deletions pkg/pubsub/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ type Publisher interface {
type Reader interface {
Next() (*websocketsv1.Message, error)
}

type PSProvider interface {
PSProvide(key string) (PubSub, error)
}
2 changes: 1 addition & 1 deletion plugins/kv/drivers/boltdb/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *Plugin) Stop() error {
return nil
}

func (s *Plugin) Provide(key string) (kv.Storage, error) {
func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion plugins/kv/drivers/memcached/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *Plugin) Name() string {
// Available interface implementation
func (s *Plugin) Available() {}

func (s *Plugin) Provide(key string) (kv.Storage, error) {
func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion plugins/kv/drivers/memory/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Plugin) Stop() error {
return nil
}

func (s *Plugin) Provide(key string) (kv.Storage, error) {
func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
Expand Down
34 changes: 0 additions & 34 deletions plugins/kv/drivers/redis/config.go

This file was deleted.

51 changes: 0 additions & 51 deletions plugins/kv/drivers/redis/plugin.go

This file was deleted.

2 changes: 1 addition & 1 deletion plugins/kv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ type StorageDriver interface {
// Provider provides storage based on the config
type Provider interface {
// Provide provides Storage based on the config key
Provide(key string) (Storage, error)
KVProvide(key string) (Storage, error)
}
40 changes: 29 additions & 11 deletions plugins/kv/storage.go → plugins/kv/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}

func (p *Plugin) Serve() chan error {
func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
Expand Down Expand Up @@ -100,7 +100,7 @@ func (p *Plugin) Serve() chan error {
continue
}

storage, err := p.drivers[memcached].Provide(configKey)
storage, err := p.drivers[memcached].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
Expand All @@ -115,7 +115,7 @@ func (p *Plugin) Serve() chan error {
continue
}

storage, err := p.drivers[boltdb].Provide(configKey)
storage, err := p.drivers[boltdb].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
Expand All @@ -129,7 +129,7 @@ func (p *Plugin) Serve() chan error {
continue
}

storage, err := p.drivers[memory].Provide(configKey)
storage, err := p.drivers[memory].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
Expand All @@ -143,15 +143,33 @@ func (p *Plugin) Serve() chan error {
continue
}

storage, err := p.drivers[redis].Provide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
// first - try local configuration
switch {
case p.cfgPlugin.Has(configKey):
storage, err := p.drivers[redis].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
}

// save the storage
p.storages[k] = storage
case p.cfgPlugin.Has(redis):
storage, err := p.drivers[redis].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
}

// save the storage
p.storages[k] = storage
continue
default:
// otherwise - error, no local or global config
p.log.Warn("no global or local redis configuration provided", "key", configKey)
continue
}

// save the storage
p.storages[k] = storage

default:
p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}
Expand Down
25 changes: 25 additions & 0 deletions plugins/memory/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package memory

import (
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
)

const PluginName string = "memory"

type Plugin struct {
log logger.Logger
}

func (p *Plugin) Init(log logger.Logger) error {
p.log = log
return nil
}

func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key)
}

func (p *Plugin) Name() string {
return PluginName
}
24 changes: 12 additions & 12 deletions plugins/websockets/memory/inMemory.go → plugins/memory/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,36 @@ import (
"google.golang.org/protobuf/proto"
)

type Plugin struct {
type PubSubDriver struct {
sync.RWMutex
log logger.Logger

// channel with the messages from the RPC
pushCh chan []byte
// user-subscribed topics
storage bst.Storage
log logger.Logger
}

func NewInMemory(log logger.Logger) pubsub.PubSub {
return &Plugin{
log: log,
func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) {
ps := &PubSubDriver{
pushCh: make(chan []byte, 10),
storage: bst.NewBST(),
log: log,
}
return ps, nil
}

func (p *Plugin) Publish(message []byte) error {
func (p *PubSubDriver) Publish(message []byte) error {
p.pushCh <- message
return nil
}

func (p *Plugin) PublishAsync(message []byte) {
func (p *PubSubDriver) PublishAsync(message []byte) {
go func() {
p.pushCh <- message
}()
}

func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
p.Lock()
defer p.Unlock()
for i := 0; i < len(topics); i++ {
Expand All @@ -48,7 +48,7 @@ func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
return nil
}

func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
p.Lock()
defer p.Unlock()
for i := 0; i < len(topics); i++ {
Expand All @@ -57,7 +57,7 @@ func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
return nil
}

func (p *Plugin) Connections(topic string, res map[string]struct{}) {
func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
p.RLock()
defer p.RUnlock()

Expand All @@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}

func (p *Plugin) Next() (*websocketsv1.Message, error) {
func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
Expand Down
Loading

0 comments on commit a38a4e6

Please sign in to comment.