Skip to content

Commit

Permalink
Make them start/stoppers
Browse files Browse the repository at this point in the history
  • Loading branch information
kentquirk committed Jun 18, 2024
1 parent c62e4a8 commit c51b365
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 13 deletions.
11 changes: 10 additions & 1 deletion pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package pubsub

import "context"
import (
"context"

"github.com/facebookgo/startstop"
)

// general usage:
// pubsub := pubsub.NewXXXPubSub()
Expand All @@ -22,6 +26,11 @@ type PubSub interface {
NewTopic(ctx context.Context, topic string) Topic
// Close shuts down all topics and the pubsub connection.
Close()

// we want to embed startstop.Starter and startstop.Stopper so that we
// can participate in injection
startstop.Starter
startstop.Stopper
}

type Topic interface {
Expand Down
17 changes: 11 additions & 6 deletions pubsub/pubsub_goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"github.com/honeycombio/refinery/config"
"github.com/redis/go-redis/v9"
"golang.org/x/exp/maps"
)
Expand All @@ -19,6 +20,7 @@ import (
// GoRedisPubSub is a PubSub implementation that uses Redis as the message broker
// and the go-redis library to interact with Redis.
type GoRedisPubSub struct {
Config *config.Config `inject:""`
rdb *redis.Client
topics map[string]*GoRedisTopic
mut sync.RWMutex
Expand All @@ -34,18 +36,21 @@ type GoRedisTopic struct {
once sync.Once
}

func NewGoRedisPubSub() *GoRedisPubSub {

func (ps *GoRedisPubSub) Start() error {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})

return &GoRedisPubSub{
rdb: rdb,
topics: make(map[string]*GoRedisTopic),
}
ps.rdb = rdb
ps.topics = make(map[string]*GoRedisTopic)
return nil
}

func (ps *GoRedisPubSub) Stop() error {
ps.Close()
return nil
}

// assert that GoRedisPubSub implements PubSub
Expand Down
15 changes: 11 additions & 4 deletions pubsub/pubsub_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"sync"

"github.com/honeycombio/refinery/config"
"golang.org/x/exp/maps"
)

// LocalPubSub is a PubSub implementation that uses local channels to send messages; it does
// not communicate with any external processes.
type LocalPubSub struct {
Config *config.Config `inject:""`
topics map[string]*LocalTopic
mut sync.RWMutex
}
Expand All @@ -23,10 +25,14 @@ type LocalTopic struct {
once sync.Once
}

func NewLocalPubSub() *LocalPubSub {
return &LocalPubSub{
topics: make(map[string]*LocalTopic),
}
func (ps *LocalPubSub) Start() error {
ps.topics = make(map[string]*LocalTopic)
return nil
}

func (ps *LocalPubSub) Stop() error {
ps.Close()
return nil
}

// assert that LocalPubSub implements PubSub
Expand Down Expand Up @@ -81,6 +87,7 @@ func (ps *LocalPubSub) NewTopic(ctx context.Context, topic string) Topic {
func (ps *LocalPubSub) Close() {
ps.mut.Lock()
topics := maps.Values(ps.topics)
ps.topics = make(map[string]*LocalTopic)
ps.mut.Unlock()

for _, t := range topics {
Expand Down
7 changes: 5 additions & 2 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ import (
var types = []string{"goredis", "local"}

func newPubSub(typ string) pubsub.PubSub {
var ps pubsub.PubSub
switch typ {
case "goredis":
return pubsub.NewGoRedisPubSub()
ps = &pubsub.GoRedisPubSub{}
case "local":
return pubsub.NewLocalPubSub()
ps = &pubsub.LocalPubSub{}
default:
panic("unknown pubsub type")
}
ps.Start()
return ps
}

func TestPubSubBasics(t *testing.T) {
Expand Down

0 comments on commit c51b365

Please sign in to comment.