Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Working, tested, but unused pubsub system #1205

Merged
merged 24 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b596f44
Add SetWithTTL
kentquirk Jun 16, 2024
99d130d
Import fanout from R3
kentquirk Jun 16, 2024
1bcece8
Merge branch 'main' into kent.generics
kentquirk Jun 17, 2024
70ff107
make fanout test easier to reason about
kentquirk Jun 17, 2024
48eba39
SetWithTTL now has clockwork and is concurrency-safe
kentquirk Jun 17, 2024
fc02f89
Use maps.Keys
kentquirk Jun 17, 2024
d40968c
Merge branch 'main' into kent.generics
kentquirk Jun 17, 2024
164b206
Merge branch 'main' into kent.generics
kentquirk Jun 17, 2024
67fc1a1
working, tested, but unused pubsub system
kentquirk Jun 15, 2024
d602aaa
better limits for CI
kentquirk Jun 15, 2024
ba00942
Use sync.Once instead of closed flag
kentquirk Jun 17, 2024
c62e4a8
Further updates, add comments
kentquirk Jun 17, 2024
c51b365
Make them start/stoppers
kentquirk Jun 18, 2024
aa2a7c6
Merge branch 'main' into kent.pubsub
kentquirk Jun 18, 2024
2790c6e
Set up pubsub with config-based parms
kentquirk Jun 19, 2024
287cbd2
Wait a little longer for CI
kentquirk Jun 19, 2024
804777d
Alternate approach to timing
kentquirk Jun 19, 2024
0fb1bc3
Redesign the API (no more topics)
kentquirk Jun 21, 2024
00efa4f
Merge branch 'main' into kent.pubsub
kentquirk Jun 21, 2024
3f89fb0
Wait a bit before we send
kentquirk Jun 21, 2024
a4bbe72
Respond to feedback
kentquirk Jun 21, 2024
2e424cd
Gah, CI slowness
kentquirk Jun 21, 2024
c696875
redis client is concurrency-safe
kentquirk Jun 21, 2024
b0d09e6
Switch to universal client and support cluster.
kentquirk Jun 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/redis/go-redis/v9 v9.5.3
github.com/sirupsen/logrus v1.9.3
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.9.0
Expand All @@ -51,6 +52,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/facebookgo/limitgroup v0.0.0-20150612190941-6abd8d71ec01 // indirect
github.com/facebookgo/muster v0.0.0-20150708232844-fd3d7953fd52 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand All @@ -17,6 +21,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-wyhash v0.0.0-20191203203029-c4841ae36371 h1:bz5ApY1kzFBvw3yckuyRBCtqGvprWrKswYK468nm+Gs=
github.com/dgryski/go-wyhash v0.0.0-20191203203029-c4841ae36371/go.mod h1:/ENMIO1SQeJ5YQeUWWpbX8f+bS8INHrrhFjXgEqi4LA=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
Expand Down Expand Up @@ -104,6 +110,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
43 changes: 43 additions & 0 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pubsub

import (
"context"

"github.com/facebookgo/startstop"
)

// general usage:
// pubsub := pubsub.NewXXXPubSub()
// pubsub.Start()
// defer pubsub.Stop()
// ctx := context.Background()
// pubsub.Publish(ctx, "topic", "message")
// sub := pubsub.Subscribe(ctx, "topic")
// for msg := range sub.Channel() {
// fmt.Println(msg)
// }
// sub.Close() // optional
// pubsub.Close()

type PubSub interface {
// Publish sends a message to all subscribers of the specified topic.
Publish(ctx context.Context, topic, message string) error
// Subscribe returns a Subscription that will receive all messages published to the specified topic.
// There is no unsubscribe method; close the subscription to stop receiving messages.
Subscribe(ctx context.Context, topic string) Subscription
// Close shuts down all topics and the pubsub connection.
Close()

// we want to embed startstop.Starter and startstop.Stopper so that we
// can participate in injection
startstop.Starter
startstop.Stopper
}

type Subscription interface {
// Channel returns the channel that will receive all messages published to the topic.
Channel() <-chan string
// Close stops the subscription and closes the channel. Calling this is optional;
// the topic will be closed when the pubsub connection is closed.
Close()
}
153 changes: 153 additions & 0 deletions pubsub/pubsub_goredis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package pubsub

import (
"context"
"strings"
"sync"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/redis/go-redis/v9"
)

// Notes for the future: we implemented a Redis-based PubSub system using 3
// different libraries: go-redis, redigo, and rueidis. All three implementations
// perform similarly, but go-redis is definitely the easiest to use for PubSub.
// The rueidis library is probably the fastest for high-performance Redis use
// when you want Redis to be a database or cache, and it has some nice features
// like automatic pipelining, but it's pretty low-level and the documentation is
// poor. Redigo is feeling pretty old at this point.

// GoRedisPubSub is a PubSub implementation that uses Redis as the message broker
// and the go-redis library to interact with Redis.
type GoRedisPubSub struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
client redis.UniversalClient
subs []*GoRedisSubscription
mut sync.RWMutex
}

// Ensure that GoRedisPubSub implements PubSub
var _ PubSub = (*GoRedisPubSub)(nil)

type GoRedisSubscription struct {
topic string
pubsub *redis.PubSub
ch chan string
done chan struct{}
once sync.Once
}

// Ensure that GoRedisSubscription implements Subscription
var _ Subscription = (*GoRedisSubscription)(nil)

func (ps *GoRedisPubSub) Start() error {
options := &redis.UniversalOptions{}
authcode := ""

if ps.Config != nil {
host, err := ps.Config.GetRedisHost()
if err != nil {
return err
}
username, err := ps.Config.GetRedisUsername()
if err != nil {
return err
}
pw, err := ps.Config.GetRedisPassword()
if err != nil {
return err
}

authcode, err = ps.Config.GetRedisAuthCode()
if err != nil {
return err
}

// we may have multiple hosts, separated by commas, so split them up and
// use them as the addrs for the client (if there are multiples, it will
// create a cluster client)
hosts := strings.Split(host, ",")
options.Addrs = hosts
options.Username = username
options.Password = pw
options.DB = ps.Config.GetRedisDatabase()
}
client := redis.NewUniversalClient(options)

// if an authcode was provided, use it to authenticate the connection
if authcode != "" {
pipe := client.Pipeline()
pipe.Auth(context.Background(), authcode)
if _, err := pipe.Exec(context.Background()); err != nil {
return err
}
}

ps.client = client
ps.subs = make([]*GoRedisSubscription, 0)
return nil
}

func (ps *GoRedisPubSub) Stop() error {
ps.Close()
return nil
}

func (ps *GoRedisPubSub) Close() {
ps.mut.Lock()
for _, sub := range ps.subs {
sub.Close()
}
ps.subs = nil
ps.mut.Unlock()
ps.client.Close()
}

func (ps *GoRedisPubSub) Publish(ctx context.Context, topic, message string) error {
return ps.client.Publish(ctx, topic, message).Err()
}

func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string) Subscription {
sub := &GoRedisSubscription{
topic: topic,
pubsub: ps.client.Subscribe(ctx, topic),
ch: make(chan string, 100),
done: make(chan struct{}),
}
ps.mut.Lock()
ps.subs = append(ps.subs, sub)
ps.mut.Unlock()
go func() {
redisch := sub.pubsub.Channel()
for {
select {
case <-sub.done:
close(sub.ch)
return
case msg := <-redisch:
if msg == nil {
continue
}
select {
case sub.ch <- msg.Payload:
default:
kentquirk marked this conversation as resolved.
Show resolved Hide resolved
ps.Logger.Warn().WithField("topic", topic).Logf("Dropping subscription message because channel is full")
}
}
}
}()
return sub
}

func (s *GoRedisSubscription) Channel() <-chan string {
return s.ch
}

func (s *GoRedisSubscription) Close() {
s.once.Do(func() {
s.pubsub.Close()
close(s.done)
})
}
102 changes: 102 additions & 0 deletions pubsub/pubsub_local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pubsub

import (
"context"
"sync"

"github.com/honeycombio/refinery/config"
)

// LocalPubSub is a PubSub implementation that uses local channels to send messages; it does
// not communicate with any external processes.
type LocalPubSub struct {
Config *config.Config `inject:""`
subs []*LocalSubscription
topics map[string]chan string
mut sync.RWMutex
}

// Ensure that LocalPubSub implements PubSub
var _ PubSub = (*LocalPubSub)(nil)

type LocalSubscription struct {
topic string
ch chan string
done chan struct{}
}

// Ensure that LocalSubscription implements Subscription
var _ Subscription = (*LocalSubscription)(nil)

// Start initializes the LocalPubSub
func (ps *LocalPubSub) Start() error {
ps.subs = make([]*LocalSubscription, 0)
ps.topics = make(map[string]chan string)
return nil
}

// Stop shuts down the LocalPubSub
func (ps *LocalPubSub) Stop() error {
ps.Close()
return nil
}

func (ps *LocalPubSub) Close() {
ps.mut.Lock()
defer ps.mut.Unlock()
for _, sub := range ps.subs {
sub.Close()
}
ps.subs = nil
}

func (ps *LocalPubSub) ensureTopic(topic string) chan string {
if _, ok := ps.topics[topic]; !ok {
ps.topics[topic] = make(chan string, 100)
}
return ps.topics[topic]
}

func (ps *LocalPubSub) Publish(ctx context.Context, topic, message string) error {
ps.mut.RLock()
ch := ps.ensureTopic(topic)
ps.mut.RUnlock()
select {
case ch <- message:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func (ps *LocalPubSub) Subscribe(ctx context.Context, topic string) Subscription {
ps.mut.Lock()
defer ps.mut.Unlock()
ch := ps.ensureTopic(topic)
sub := &LocalSubscription{
topic: topic,
ch: ch,
done: make(chan struct{}),
}
ps.subs = append(ps.subs, sub)
go func() {
for {
select {
case <-sub.done:
close(ch)
return
case msg := <-ch:
sub.ch <- msg
}
}
}()
return sub
}

func (s *LocalSubscription) Channel() <-chan string {
return s.ch
}

func (s *LocalSubscription) Close() {
close(s.done)
}
Loading
Loading