Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Redis publish-subscribe connector #79

Merged
merged 1 commit into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/nats-io/nats.go v1.22.1
github.com/nats-io/stan.go v0.10.2
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.5
github.com/reugn/go-streams v0.9.0
github.com/reugn/go-streams/aerospike v0.0.0
github.com/reugn/go-streams/kafka v0.0.0
Expand Down
8 changes: 4 additions & 4 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down Expand Up @@ -407,8 +407,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/reugn/go-streams v0.9.0 h1:OSOBi8V4B8wb66g0+/0RyA8BkyMcxIj+b1ERVI6l/eQ=
github.com/reugn/go-streams v0.9.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
6 changes: 4 additions & 2 deletions examples/redis/main.go → examples/redis/pubsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ func main() {
DB: 0, // use default DB
}

source, err := ext.NewRedisSource(ctx, config, "test")
redisClient := redis.NewClient(config)

source, err := ext.NewPubSubSource(ctx, redisClient, "test")
if err != nil {
log.Fatal(err)
}

toUpperMapFlow := flow.NewMap(toUpper, 1)
sink := ext.NewRedisSink(ctx, config, "test2")
sink := ext.NewPubSubSink(ctx, redisClient, "test2")

source.
Via(toUpperMapFlow).
Expand Down
2 changes: 1 addition & 1 deletion redis/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/reugn/go-streams/redis
go 1.18

require (
github.com/redis/go-redis/v9 v9.0.2
github.com/redis/go-redis/v9 v9.0.5
github.com/reugn/go-streams v0.9.0
)

Expand Down
12 changes: 4 additions & 8 deletions redis/go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
github.com/reugn/go-streams v0.9.0 h1:OSOBi8V4B8wb66g0+/0RyA8BkyMcxIj+b1ERVI6l/eQ=
github.com/reugn/go-streams v0.9.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
114 changes: 0 additions & 114 deletions redis/redis.go

This file was deleted.

126 changes: 126 additions & 0 deletions redis/redis_pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package redis

import (
"context"
"log"

"github.com/redis/go-redis/v9"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// PubSubSource represents a Redis Pub/Sub source connector.
//
// In the Publish/Subscribe messaging paradigm senders (publishers)
// are not programmed to send their messages to specific receivers (subscribers).
// Rather, published messages are characterized into channels, without knowledge
// of what (if any) subscribers there may be.
type PubSubSource struct {
ctx context.Context
redisClient *redis.Client
channel string
out chan interface{}
}

// NewPubSubSource returns a new PubSubSource instance.
//
// The given redisClient is subscribed to the provided channel.
// The replies to subscription and unsubscribing operations are sent in the form of messages
// so that the client reads a coherent stream of messages where the first element
// indicates the type of message.
func NewPubSubSource(ctx context.Context, redisClient *redis.Client, channel string) (*PubSubSource, error) {
pubsub := redisClient.Subscribe(ctx, channel)

// Wait for a confirmation that subscription is created before publishing anything
_, err := pubsub.Receive(ctx)
if err != nil {
return nil, err
}

source := &PubSubSource{
ctx: ctx,
redisClient: redisClient,
channel: channel,
out: make(chan interface{}),
}

go source.init(pubsub.Channel())
return source, nil
}

// init starts the main loop
func (ps *PubSubSource) init(ch <-chan *redis.Message) {
loop:
for {
select {
case <-ps.ctx.Done():
break loop

case msg := <-ch:
ps.out <- msg
}
}

log.Printf("Closing Redis Pub/Sub consumer")
close(ps.out)
ps.redisClient.Close()
}

// Via streams data through the given flow
func (ps *PubSubSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(ps, _flow)
return _flow
}

// Out returns an output channel for sending data
func (ps *PubSubSource) Out() <-chan interface{} {
return ps.out
}

// PubSubSink represents a Redis Pub/Sub sink connector.
type PubSubSink struct {
ctx context.Context
redisClient *redis.Client
channel string
in chan interface{}
}

// NewPubSubSink returns a new PubSubSink instance.
//
// The incoming messages will be published to the given target channel using the
// provided redis.Client.
func NewPubSubSink(ctx context.Context, redisClient *redis.Client, channel string) *PubSubSink {
sink := &PubSubSink{
ctx: ctx,
redisClient: redisClient,
channel: channel,
in: make(chan interface{}),
}

go sink.init()
return sink
}

// init starts the main loop
func (ps *PubSubSink) init() {
for msg := range ps.in {
switch m := msg.(type) {
case string:
err := ps.redisClient.Publish(ps.ctx, ps.channel, m).Err()
if err != nil {
log.Printf("Error in redisClient.Publish: %s", err)
}

default:
log.Printf("Unsupported message type %v", m)
}
}

log.Printf("Closing Redis Pub/Sub producer")
ps.redisClient.Close()
}

// In returns an input channel for receiving data
func (ps *PubSubSink) In() chan<- interface{} {
return ps.in
}