diff --git a/examples/go.mod b/examples/go.mod index 3e2b6a8..d17126f 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -9,7 +9,7 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/nats-io/nats.go v1.22.1 github.com/nats-io/stan.go v0.10.2 - github.com/redis/go-redis/v9 v9.0.2 + github.com/redis/go-redis/v9 v9.0.5 github.com/reugn/go-streams v0.9.0 github.com/reugn/go-streams/aerospike v0.0.0 github.com/reugn/go-streams/kafka v0.0.0 diff --git a/examples/go.sum b/examples/go.sum index 65a9882..cf0e556 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= -github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ= -github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8= +github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= +github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -407,8 +407,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= -github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= +github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= +github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/reugn/go-streams v0.9.0 h1:OSOBi8V4B8wb66g0+/0RyA8BkyMcxIj+b1ERVI6l/eQ= github.com/reugn/go-streams v0.9.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/examples/redis/main.go b/examples/redis/pubsub/main.go similarity index 83% rename from examples/redis/main.go rename to examples/redis/pubsub/main.go index f72ac09..83e8f5d 100644 --- a/examples/redis/main.go +++ b/examples/redis/pubsub/main.go @@ -29,13 +29,15 @@ func main() { DB: 0, // use default DB } - source, err := ext.NewRedisSource(ctx, config, "test") + redisClient := redis.NewClient(config) + + source, err := ext.NewPubSubSource(ctx, redisClient, "test") if err != nil { log.Fatal(err) } toUpperMapFlow := flow.NewMap(toUpper, 1) - sink := ext.NewRedisSink(ctx, config, "test2") + sink := ext.NewPubSubSink(ctx, redisClient, "test2") source. Via(toUpperMapFlow). diff --git a/redis/go.mod b/redis/go.mod index 49ca044..c8f7fa0 100644 --- a/redis/go.mod +++ b/redis/go.mod @@ -3,7 +3,7 @@ module github.com/reugn/go-streams/redis go 1.18 require ( - github.com/redis/go-redis/v9 v9.0.2 + github.com/redis/go-redis/v9 v9.0.5 github.com/reugn/go-streams v0.9.0 ) diff --git a/redis/go.sum b/redis/go.sum index 06a3f26..d7deed1 100644 --- a/redis/go.sum +++ b/redis/go.sum @@ -1,14 +1,10 @@ -github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ= -github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8= +github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= +github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= -github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= +github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o= +github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk= github.com/reugn/go-streams v0.9.0 h1:OSOBi8V4B8wb66g0+/0RyA8BkyMcxIj+b1ERVI6l/eQ= github.com/reugn/go-streams v0.9.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/redis/redis.go b/redis/redis.go deleted file mode 100644 index 6569645..0000000 --- a/redis/redis.go +++ /dev/null @@ -1,114 +0,0 @@ -package redis - -import ( - "context" - "log" - - "github.com/redis/go-redis/v9" - "github.com/reugn/go-streams" - "github.com/reugn/go-streams/flow" -) - -// RedisSource represents a Redis Pub/Sub source connector. -type RedisSource struct { - ctx context.Context - redisdb *redis.Client - channel string - out chan interface{} -} - -// NewRedisSource returns a new RedisSource instance. -func NewRedisSource(ctx context.Context, config *redis.Options, channel string) (*RedisSource, error) { - redisdb := redis.NewClient(config) - pubsub := redisdb.Subscribe(ctx, channel) - - // Wait for a confirmation that subscription is created before publishing anything - _, err := pubsub.Receive(ctx) - if err != nil { - return nil, err - } - - source := &RedisSource{ - ctx: ctx, - redisdb: redisdb, - channel: channel, - out: make(chan interface{}), - } - - go source.init(pubsub.Channel()) - return source, nil -} - -// init starts the main loop -func (rs *RedisSource) init(ch <-chan *redis.Message) { -loop: - for { - select { - case <-rs.ctx.Done(): - break loop - - case msg := <-ch: - rs.out <- msg - } - } - - log.Printf("Closing redis consumer") - close(rs.out) - rs.redisdb.Close() -} - -// Via streams data through the given flow -func (rs *RedisSource) Via(_flow streams.Flow) streams.Flow { - flow.DoStream(rs, _flow) - return _flow -} - -// Out returns an output channel for sending data -func (rs *RedisSource) Out() <-chan interface{} { - return rs.out -} - -// RedisSink represents a Redis Pub/Sub sink connector. -type RedisSink struct { - ctx context.Context - redisdb *redis.Client - channel string - in chan interface{} -} - -// NewRedisSink returns a new RedisSink instance. -func NewRedisSink(ctx context.Context, config *redis.Options, channel string) *RedisSink { - sink := &RedisSink{ - ctx: ctx, - redisdb: redis.NewClient(config), - channel: channel, - in: make(chan interface{}), - } - - go sink.init() - return sink -} - -// init starts the main loop -func (rs *RedisSink) init() { - for msg := range rs.in { - switch m := msg.(type) { - case string: - err := rs.redisdb.Publish(rs.ctx, rs.channel, m).Err() - if err != nil { - log.Printf("redisdb.Publish failed with: %s", err) - } - - default: - log.Printf("Unsupported message type %v", m) - } - } - - log.Printf("Closing redis producer") - rs.redisdb.Close() -} - -// In returns an input channel for receiving data -func (rs *RedisSink) In() chan<- interface{} { - return rs.in -} diff --git a/redis/redis_pubsub.go b/redis/redis_pubsub.go new file mode 100644 index 0000000..c4fdaab --- /dev/null +++ b/redis/redis_pubsub.go @@ -0,0 +1,126 @@ +package redis + +import ( + "context" + "log" + + "github.com/redis/go-redis/v9" + "github.com/reugn/go-streams" + "github.com/reugn/go-streams/flow" +) + +// PubSubSource represents a Redis Pub/Sub source connector. +// +// In the Publish/Subscribe messaging paradigm senders (publishers) +// are not programmed to send their messages to specific receivers (subscribers). +// Rather, published messages are characterized into channels, without knowledge +// of what (if any) subscribers there may be. +type PubSubSource struct { + ctx context.Context + redisClient *redis.Client + channel string + out chan interface{} +} + +// NewPubSubSource returns a new PubSubSource instance. +// +// The given redisClient is subscribed to the provided channel. +// The replies to subscription and unsubscribing operations are sent in the form of messages +// so that the client reads a coherent stream of messages where the first element +// indicates the type of message. +func NewPubSubSource(ctx context.Context, redisClient *redis.Client, channel string) (*PubSubSource, error) { + pubsub := redisClient.Subscribe(ctx, channel) + + // Wait for a confirmation that subscription is created before publishing anything + _, err := pubsub.Receive(ctx) + if err != nil { + return nil, err + } + + source := &PubSubSource{ + ctx: ctx, + redisClient: redisClient, + channel: channel, + out: make(chan interface{}), + } + + go source.init(pubsub.Channel()) + return source, nil +} + +// init starts the main loop +func (ps *PubSubSource) init(ch <-chan *redis.Message) { +loop: + for { + select { + case <-ps.ctx.Done(): + break loop + + case msg := <-ch: + ps.out <- msg + } + } + + log.Printf("Closing Redis Pub/Sub consumer") + close(ps.out) + ps.redisClient.Close() +} + +// Via streams data through the given flow +func (ps *PubSubSource) Via(_flow streams.Flow) streams.Flow { + flow.DoStream(ps, _flow) + return _flow +} + +// Out returns an output channel for sending data +func (ps *PubSubSource) Out() <-chan interface{} { + return ps.out +} + +// PubSubSink represents a Redis Pub/Sub sink connector. +type PubSubSink struct { + ctx context.Context + redisClient *redis.Client + channel string + in chan interface{} +} + +// NewPubSubSink returns a new PubSubSink instance. +// +// The incoming messages will be published to the given target channel using the +// provided redis.Client. +func NewPubSubSink(ctx context.Context, redisClient *redis.Client, channel string) *PubSubSink { + sink := &PubSubSink{ + ctx: ctx, + redisClient: redisClient, + channel: channel, + in: make(chan interface{}), + } + + go sink.init() + return sink +} + +// init starts the main loop +func (ps *PubSubSink) init() { + for msg := range ps.in { + switch m := msg.(type) { + case string: + err := ps.redisClient.Publish(ps.ctx, ps.channel, m).Err() + if err != nil { + log.Printf("Error in redisClient.Publish: %s", err) + } + + default: + log.Printf("Unsupported message type %v", m) + } + } + + log.Printf("Closing Redis Pub/Sub producer") + ps.redisClient.Close() +} + +// In returns an input channel for receiving data +func (ps *PubSubSink) In() chan<- interface{} { + return ps.in +}