Skip to content

Commit

Permalink
Merge pull request #4 from goware/goware-channel-go-redis
Browse files Browse the repository at this point in the history
use goware/channel and switch to go-redis library
  • Loading branch information
xiam authored Jan 4, 2023
2 parents aaac68b + d5317b0 commit 35d21ad
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 374 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

test:
go clean -testcache && go test -v -race ./...
41 changes: 5 additions & 36 deletions _examples/demo-redisbus-chat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
"time"

"github.com/gomodule/redigo/redis"
"github.com/go-redis/redis/v8"
"github.com/goware/logger"
"github.com/goware/pubsub/redisbus"
)
Expand All @@ -30,13 +30,12 @@ func main() {
serverName := *fServerName

// Setup redis
redisPool, err := NewRedisPool("localhost")
if err != nil {
log.Fatal(err)
}
redisClient := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})

// Setup pubsub
bus, err := redisbus.New[Message](logger.NewLogger(logger.LogLevel_DEBUG), redisPool, MessageEncoder[Message]{})
bus, err := redisbus.New[Message](logger.NewLogger(logger.LogLevel_DEBUG), redisClient, MessageEncoder[Message]{})
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -123,33 +122,3 @@ func main() {

wg.Wait()
}

func NewRedisPool(host string) (*redis.Pool, error) {
dialFn := func() (redis.Conn, error) {
addr := fmt.Sprintf("%s:%d", host, 6379)

conn, err := redis.Dial("tcp", addr, redis.DialDatabase(0))
if err != nil {
return nil, fmt.Errorf("could not dial redis host: %w", err)
}

return conn, nil
}

pool := &redis.Pool{
Dial: dialFn,
TestOnBorrow: func(conn redis.Conn, t time.Time) error {
_, err := conn.Do("PING")
return fmt.Errorf("PING failed: %w", err)
},
}

conn := pool.Get()
defer conn.Close()

if err := conn.Err(); err != nil {
return nil, err
}

return pool, nil
}
41 changes: 5 additions & 36 deletions _examples/demo-redisbus-gob/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ import (
"log"
"time"

"github.com/gomodule/redigo/redis"
"github.com/go-redis/redis/v8"
"github.com/goware/logger"
"github.com/goware/pubsub/redisbus"
)

func main() {
redisPool, err := NewRedisPool("localhost")
if err != nil {
log.Fatal(err)
}
redisClient := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})

bus, err := redisbus.New[Message](logger.NewLogger(logger.LogLevel_DEBUG), redisPool, MessageEncoder[Message]{})
bus, err := redisbus.New[Message](logger.NewLogger(logger.LogLevel_DEBUG), redisClient, MessageEncoder[Message]{})
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -89,33 +88,3 @@ loop:
}
}
}

func NewRedisPool(host string) (*redis.Pool, error) {
dialFn := func() (redis.Conn, error) {
addr := fmt.Sprintf("%s:%d", host, 6379)

conn, err := redis.Dial("tcp", addr, redis.DialDatabase(0))
if err != nil {
return nil, fmt.Errorf("could not dial redis host: %w", err)
}

return conn, nil
}

pool := &redis.Pool{
Dial: dialFn,
TestOnBorrow: func(conn redis.Conn, t time.Time) error {
_, err := conn.Do("PING")
return fmt.Errorf("PING failed: %w", err)
},
}

conn := pool.Get()
defer conn.Close()

if err := conn.Err(); err != nil {
return nil, err
}

return pool, nil
}
41 changes: 5 additions & 36 deletions _examples/demo-redisbus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"log"
"time"

"github.com/gomodule/redigo/redis"
"github.com/go-redis/redis/v8"
"github.com/goware/logger"
"github.com/goware/pubsub/redisbus"
)
Expand All @@ -16,12 +16,11 @@ type Message struct {
}

func main() {
redisPool, err := NewRedisPool("localhost")
if err != nil {
log.Fatal(err)
}
redisClient := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})

bus, err := redisbus.New[Message](logger.NewLogger(logger.LogLevel_DEBUG), redisPool)
bus, err := redisbus.New[Message](logger.NewLogger(logger.LogLevel_DEBUG), redisClient)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -87,33 +86,3 @@ loop:
// sub2.Unsubscribe()
// sub3.Unsubscribe()
}

func NewRedisPool(host string) (*redis.Pool, error) {
dialFn := func() (redis.Conn, error) {
addr := fmt.Sprintf("%s:%d", host, 6379)

conn, err := redis.Dial("tcp", addr, redis.DialDatabase(0))
if err != nil {
return nil, fmt.Errorf("could not dial redis host: %w", err)
}

return conn, nil
}

pool := &redis.Pool{
Dial: dialFn,
TestOnBorrow: func(conn redis.Conn, t time.Time) error {
_, err := conn.Do("PING")
return fmt.Errorf("PING failed: %w", err)
},
}

conn := pool.Get()
defer conn.Close()

if err := conn.Err(); err != nil {
return nil, err
}

return pool, nil
}
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ go 1.18
require github.com/gomodule/redigo v1.8.9

require (
github.com/davecgh/go-spew v1.1.1
github.com/go-redis/redis/v8 v8.11.5
github.com/goware/channel v0.2.1
github.com/goware/logger v0.1.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.8.1
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
30 changes: 27 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/goware/channel v0.2.1 h1:sR3yOpg6+YO0do2bzcqFUGF5DLN/CK+lgKU0D1xCBw4=
github.com/goware/channel v0.2.1/go.mod h1:R1EdaSW0bQ7A6KvEtD/FZC4ZLrnf/TMnBrzzwXVfT7M=
github.com/goware/logger v0.1.0 h1:VB38nDsvhqPIRom/xi2iA3wq8WJRqwQx9liNT1PLGF8=
github.com/goware/logger v0.1.0/go.mod h1:IC34c5H56R1I4/R/d51aQhzHsjSJqkQyIHyuJxOiu0w=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
18 changes: 6 additions & 12 deletions membus/membus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"sync/atomic"

"github.com/goware/channel"
"github.com/goware/logger"
"github.com/goware/pubsub"
)
Expand Down Expand Up @@ -81,10 +82,7 @@ func (m *MemBus[M]) Publish(ctx context.Context, channelID string, message M) er
}

for _, sub := range m.subscribers[channelID] {
select {
case <-sub.done:
case sub.sendCh <- message:
}
sub.ch.Send(message)
}

return nil
Expand All @@ -103,24 +101,20 @@ func (m *MemBus[M]) Subscribe(ctx context.Context, channelID string) (pubsub.Sub
m.subscribers[channelID] = []*subscriber[M]{}
}

ch := make(chan M)
subscriber := &subscriber[M]{
pubsub: m,
channelID: channelID,
ch: ch,
sendCh: pubsub.MakeUnboundedBufferedChan(ch, m.log, 100),
ch: channel.NewUnboundedChan[M](m.log, 100, 10_000),
done: make(chan struct{}),
}

subscriber.unsubscribe = func() {
close(subscriber.done)
subscriber.ch.Close()
subscriber.ch.Flush()

m.mu.Lock()
defer m.mu.Unlock()
close(subscriber.sendCh)

// flush subscriber.ch so that the MakeUnboundedBuffered goroutine exits
for ok := true; ok; _, ok = <-subscriber.ch {
}

for i, sub := range m.subscribers[channelID] {
if sub == subscriber {
Expand Down
6 changes: 3 additions & 3 deletions membus/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/goware/channel"
"github.com/goware/pubsub"
)

Expand All @@ -12,8 +13,7 @@ var _ pubsub.Subscription[any] = &subscriber[any]{}
type subscriber[M any] struct {
pubsub pubsub.PubSub[M]
channelID string
ch <-chan M
sendCh chan<- M
ch channel.Channel[M]
done chan struct{}
unsubscribe func()
unsubscribeOnce sync.Once
Expand All @@ -28,7 +28,7 @@ func (s *subscriber[M]) SendMessage(ctx context.Context, message M) error {
}

func (s *subscriber[M]) ReadMessage() <-chan M {
return s.ch
return s.ch.ReadChannel()
}

func (s *subscriber[M]) Done() <-chan struct{} {
Expand Down
Loading

0 comments on commit 35d21ad

Please sign in to comment.