Skip to content

Commit

Permalink
feat: Working, tested, but unused pubsub system (#1205)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- In order to implement a gossip protocol, we need a functional pubsub
system. I tried the go-redis library instead of redigo and it was pretty
simple to use.

## Short description of the changes

- Create a standardized pubsub interface
- Implement go-redis version of it
- Implement local version of it
- Write tests
- Write benchmarks
- Implement startstop
- Set up based on our redis peer config (I have not used all of the pool
size parameters, since they don't seem to matter anymore)

On my local machine (without going through an external network), it runs
at about 20K messages per second, with average latency of about 200uS
and max latency of about 5mS. On CI, the first test had avg latency of
6mS.

I also tried implementing a version using the reuidis library; see
comments in the code. Basically, for pubsub it was no faster and the
implementation was a bit ugly so I did not include it.
  • Loading branch information
kentquirk authored Jun 21, 2024
1 parent 4a7077d commit 6536ea8
Show file tree
Hide file tree
Showing 6 changed files with 577 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/redis/go-redis/v9 v9.5.3
github.com/sirupsen/logrus v1.9.3
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.9.0
Expand All @@ -51,6 +52,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/facebookgo/limitgroup v0.0.0-20150612190941-6abd8d71ec01 // indirect
github.com/facebookgo/muster v0.0.0-20150708232844-fd3d7953fd52 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig
github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand All @@ -17,6 +21,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dgryski/go-wyhash v0.0.0-20191203203029-c4841ae36371 h1:bz5ApY1kzFBvw3yckuyRBCtqGvprWrKswYK468nm+Gs=
github.com/dgryski/go-wyhash v0.0.0-20191203203029-c4841ae36371/go.mod h1:/ENMIO1SQeJ5YQeUWWpbX8f+bS8INHrrhFjXgEqi4LA=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
Expand Down Expand Up @@ -104,6 +110,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
43 changes: 43 additions & 0 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pubsub

import (
"context"

"github.com/facebookgo/startstop"
)

// general usage:
// pubsub := pubsub.NewXXXPubSub()
// pubsub.Start()
// defer pubsub.Stop()
// ctx := context.Background()
// pubsub.Publish(ctx, "topic", "message")
// sub := pubsub.Subscribe(ctx, "topic")
// for msg := range sub.Channel() {
// fmt.Println(msg)
// }
// sub.Close() // optional
// pubsub.Close()

type PubSub interface {
// Publish sends a message to all subscribers of the specified topic.
Publish(ctx context.Context, topic, message string) error
// Subscribe returns a Subscription that will receive all messages published to the specified topic.
// There is no unsubscribe method; close the subscription to stop receiving messages.
Subscribe(ctx context.Context, topic string) Subscription
// Close shuts down all topics and the pubsub connection.
Close()

// we want to embed startstop.Starter and startstop.Stopper so that we
// can participate in injection
startstop.Starter
startstop.Stopper
}

type Subscription interface {
// Channel returns the channel that will receive all messages published to the topic.
Channel() <-chan string
// Close stops the subscription and closes the channel. Calling this is optional;
// the topic will be closed when the pubsub connection is closed.
Close()
}
153 changes: 153 additions & 0 deletions pubsub/pubsub_goredis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package pubsub

import (
"context"
"strings"
"sync"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/redis/go-redis/v9"
)

// Notes for the future: we implemented a Redis-based PubSub system using 3
// different libraries: go-redis, redigo, and rueidis. All three implementations
// perform similarly, but go-redis is definitely the easiest to use for PubSub.
// The rueidis library is probably the fastest for high-performance Redis use
// when you want Redis to be a database or cache, and it has some nice features
// like automatic pipelining, but it's pretty low-level and the documentation is
// poor. Redigo is feeling pretty old at this point.

// GoRedisPubSub is a PubSub implementation that uses Redis as the message broker
// and the go-redis library to interact with Redis.
type GoRedisPubSub struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
client redis.UniversalClient
subs []*GoRedisSubscription
mut sync.RWMutex
}

// Ensure that GoRedisPubSub implements PubSub
var _ PubSub = (*GoRedisPubSub)(nil)

type GoRedisSubscription struct {
topic string
pubsub *redis.PubSub
ch chan string
done chan struct{}
once sync.Once
}

// Ensure that GoRedisSubscription implements Subscription
var _ Subscription = (*GoRedisSubscription)(nil)

func (ps *GoRedisPubSub) Start() error {
options := &redis.UniversalOptions{}
authcode := ""

if ps.Config != nil {
host, err := ps.Config.GetRedisHost()
if err != nil {
return err
}
username, err := ps.Config.GetRedisUsername()
if err != nil {
return err
}
pw, err := ps.Config.GetRedisPassword()
if err != nil {
return err
}

authcode, err = ps.Config.GetRedisAuthCode()
if err != nil {
return err
}

// we may have multiple hosts, separated by commas, so split them up and
// use them as the addrs for the client (if there are multiples, it will
// create a cluster client)
hosts := strings.Split(host, ",")
options.Addrs = hosts
options.Username = username
options.Password = pw
options.DB = ps.Config.GetRedisDatabase()
}
client := redis.NewUniversalClient(options)

// if an authcode was provided, use it to authenticate the connection
if authcode != "" {
pipe := client.Pipeline()
pipe.Auth(context.Background(), authcode)
if _, err := pipe.Exec(context.Background()); err != nil {
return err
}
}

ps.client = client
ps.subs = make([]*GoRedisSubscription, 0)
return nil
}

func (ps *GoRedisPubSub) Stop() error {
ps.Close()
return nil
}

func (ps *GoRedisPubSub) Close() {
ps.mut.Lock()
for _, sub := range ps.subs {
sub.Close()
}
ps.subs = nil
ps.mut.Unlock()
ps.client.Close()
}

func (ps *GoRedisPubSub) Publish(ctx context.Context, topic, message string) error {
return ps.client.Publish(ctx, topic, message).Err()
}

func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string) Subscription {
sub := &GoRedisSubscription{
topic: topic,
pubsub: ps.client.Subscribe(ctx, topic),
ch: make(chan string, 100),
done: make(chan struct{}),
}
ps.mut.Lock()
ps.subs = append(ps.subs, sub)
ps.mut.Unlock()
go func() {
redisch := sub.pubsub.Channel()
for {
select {
case <-sub.done:
close(sub.ch)
return
case msg := <-redisch:
if msg == nil {
continue
}
select {
case sub.ch <- msg.Payload:
default:
ps.Logger.Warn().WithField("topic", topic).Logf("Dropping subscription message because channel is full")
}
}
}
}()
return sub
}

func (s *GoRedisSubscription) Channel() <-chan string {
return s.ch
}

func (s *GoRedisSubscription) Close() {
s.once.Do(func() {
s.pubsub.Close()
close(s.done)
})
}
102 changes: 102 additions & 0 deletions pubsub/pubsub_local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pubsub

import (
"context"
"sync"

"github.com/honeycombio/refinery/config"
)

// LocalPubSub is a PubSub implementation that uses local channels to send messages; it does
// not communicate with any external processes.
type LocalPubSub struct {
Config *config.Config `inject:""`
subs []*LocalSubscription
topics map[string]chan string
mut sync.RWMutex
}

// Ensure that LocalPubSub implements PubSub
var _ PubSub = (*LocalPubSub)(nil)

type LocalSubscription struct {
topic string
ch chan string
done chan struct{}
}

// Ensure that LocalSubscription implements Subscription
var _ Subscription = (*LocalSubscription)(nil)

// Start initializes the LocalPubSub
func (ps *LocalPubSub) Start() error {
ps.subs = make([]*LocalSubscription, 0)
ps.topics = make(map[string]chan string)
return nil
}

// Stop shuts down the LocalPubSub
func (ps *LocalPubSub) Stop() error {
ps.Close()
return nil
}

func (ps *LocalPubSub) Close() {
ps.mut.Lock()
defer ps.mut.Unlock()
for _, sub := range ps.subs {
sub.Close()
}
ps.subs = nil
}

func (ps *LocalPubSub) ensureTopic(topic string) chan string {
if _, ok := ps.topics[topic]; !ok {
ps.topics[topic] = make(chan string, 100)
}
return ps.topics[topic]
}

func (ps *LocalPubSub) Publish(ctx context.Context, topic, message string) error {
ps.mut.RLock()
ch := ps.ensureTopic(topic)
ps.mut.RUnlock()
select {
case ch <- message:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func (ps *LocalPubSub) Subscribe(ctx context.Context, topic string) Subscription {
ps.mut.Lock()
defer ps.mut.Unlock()
ch := ps.ensureTopic(topic)
sub := &LocalSubscription{
topic: topic,
ch: ch,
done: make(chan struct{}),
}
ps.subs = append(ps.subs, sub)
go func() {
for {
select {
case <-sub.done:
close(ch)
return
case msg := <-ch:
sub.ch <- msg
}
}
}()
return sub
}

func (s *LocalSubscription) Channel() <-chan string {
return s.ch
}

func (s *LocalSubscription) Close() {
close(s.done)
}
Loading

0 comments on commit 6536ea8

Please sign in to comment.