Skip to content

Commit

Permalink
refactor Redis publish-subscribe connector (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Aug 19, 2023
1 parent aaf190e commit f53d032
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 130 deletions.
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/nats-io/nats.go v1.22.1
github.com/nats-io/stan.go v0.10.2
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.5
github.com/reugn/go-streams v0.9.0
github.com/reugn/go-streams/aerospike v0.0.0
github.com/reugn/go-streams/kafka v0.0.0
Expand Down
8 changes: 4 additions & 4 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down Expand Up @@ -407,8 +407,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/reugn/go-streams v0.9.0 h1:OSOBi8V4B8wb66g0+/0RyA8BkyMcxIj+b1ERVI6l/eQ=
github.com/reugn/go-streams v0.9.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
6 changes: 4 additions & 2 deletions examples/redis/main.go → examples/redis/pubsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func main() {
DB: 0, // use default DB
}

source, err := ext.NewRedisSource(ctx, config, "test")
redisClient := redis.NewClient(config)

source, err := ext.NewPubSubSource(ctx, redisClient, "test")
if err != nil {
log.Fatal(err)
}

toUpperMapFlow := flow.NewMap(toUpper, 1)
sink := ext.NewRedisSink(ctx, config, "test2")
sink := ext.NewPubSubSink(ctx, redisClient, "test2")

source.
Via(toUpperMapFlow).
Expand Down
2 changes: 1 addition & 1 deletion redis/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/reugn/go-streams/redis
go 1.18

require (
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.5
github.com/reugn/go-streams v0.9.0
)

Expand Down
12 changes: 4 additions & 8 deletions redis/go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/reugn/go-streams v0.9.0 h1:OSOBi8V4B8wb66g0+/0RyA8BkyMcxIj+b1ERVI6l/eQ=
github.com/reugn/go-streams v0.9.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
114 changes: 0 additions & 114 deletions redis/redis.go

This file was deleted.

126 changes: 126 additions & 0 deletions redis/redis_pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package redis

import (
"context"
"log"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// PubSubSource represents a Redis Pub/Sub source connector.
//
// In the Publish/Subscribe messaging paradigm senders (publishers)
// are not programmed to send their messages to specific receivers (subscribers).
// Rather, published messages are characterized into channels, without knowledge
// of what (if any) subscribers there may be.
type PubSubSource struct {
ctx context.Context
redisClient *redis.Client
channel string
out chan interface{}
}

// NewPubSubSource returns a new PubSubSource instance.
//
// The given redisClient is subscribed to the provided channel.
// The replies to subscription and unsubscribing operations are sent in the form of messages
// so that the client reads a coherent stream of messages where the first element
// indicates the type of message.
func NewPubSubSource(ctx context.Context, redisClient *redis.Client, channel string) (*PubSubSource, error) {
pubsub := redisClient.Subscribe(ctx, channel)

// Wait for a confirmation that subscription is created before publishing anything
_, err := pubsub.Receive(ctx)
if err != nil {
return nil, err
}

source := &PubSubSource{
ctx: ctx,
redisClient: redisClient,
channel: channel,
out: make(chan interface{}),
}

go source.init(pubsub.Channel())
return source, nil
}

// init starts the main loop
func (ps *PubSubSource) init(ch <-chan *redis.Message) {
loop:
for {
select {
case <-ps.ctx.Done():
break loop

case msg := <-ch:
ps.out <- msg
}
}

log.Printf("Closing Redis Pub/Sub consumer")
close(ps.out)
ps.redisClient.Close()
}

// Via streams data through the given flow
func (ps *PubSubSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(ps, _flow)
return _flow
}

// Out returns an output channel for sending data
func (ps *PubSubSource) Out() <-chan interface{} {
return ps.out
}

// PubSubSink represents a Redis Pub/Sub sink connector.
type PubSubSink struct {
ctx context.Context
redisClient *redis.Client
channel string
in chan interface{}
}

// NewPubSubSink returns a new PubSubSink instance.
//
// The incoming messages will be published to the given target channel using the
// provided redis.Client.
func NewPubSubSink(ctx context.Context, redisClient *redis.Client, channel string) *PubSubSink {
sink := &PubSubSink{
ctx: ctx,
redisClient: redisClient,
channel: channel,
in: make(chan interface{}),
}

go sink.init()
return sink
}

// init starts the main loop
func (ps *PubSubSink) init() {
for msg := range ps.in {
switch m := msg.(type) {
case string:
err := ps.redisClient.Publish(ps.ctx, ps.channel, m).Err()
if err != nil {
log.Printf("Error in redisClient.Publish: %s", err)
}

default:
log.Printf("Unsupported message type %v", m)
}
}

log.Printf("Closing Redis Pub/Sub producer")
ps.redisClient.Close()
}

// In returns an input channel for receiving data
func (ps *PubSubSink) In() chan<- interface{} {
return ps.in
}

0 comments on commit f53d032

Please sign in to comment.