Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maint: add instrumentation for GoRedisPubSub #1229

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion internal/peer/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
)

func newPeers(c config.Config) (Peers, error) {
Expand Down Expand Up @@ -46,7 +47,10 @@ func newPeers(c config.Config) (Peers, error) {
pubsubber = &pubsub.GoRedisPubSub{}
}
case "redis":
pubsubber = &pubsub.GoRedisPubSub{}
pubsubber = &pubsub.GoRedisPubSub{
Metrics: &metrics.NullMetrics{},
Tracer: noop.NewTracerProvider().Tracer("test"),
}
peers = &RedisPubsubPeers{}
default:
// this should have been caught by validation
Expand Down
2 changes: 1 addition & 1 deletion internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (p *RedisPubsubPeers) checkHash() {
p.Metrics.Gauge("peer_hash", float64(p.hash))
}

func (p *RedisPubsubPeers) listen(msg string) {
func (p *RedisPubsubPeers) listen(ctx context.Context, msg string) {
cmd := &peerCommand{}
if !cmd.unmarshal(msg) {
return
Expand Down
5 changes: 4 additions & 1 deletion pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type PubSub interface {
// There is no unsubscribe method; close the subscription to stop receiving messages.
// The subscription only exists to provide a way to stop receiving messages; if you don't need to stop,
// you can ignore the return value.
Subscribe(ctx context.Context, topic string, callback func(msg string)) Subscription
Subscribe(ctx context.Context, topic string, callback SubscriptionCallback) Subscription
// Close shuts down all topics and the pubsub connection.
Close()

Expand All @@ -36,6 +36,9 @@ type PubSub interface {
startstop.Stopper
}

// SubscriptionCallback is the function signature for a subscription callback.
type SubscriptionCallback func(context.Context, string)

type Subscription interface {
// Close stops the subscription which means the callback will no longer be called.
// Optional; the topic will be closed when the pubsub connection is closed.
Expand Down
31 changes: 28 additions & 3 deletions pubsub/pubsub_goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"strings"
"sync"

"go.opentelemetry.io/otel/trace"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/otelutil"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/redis/go-redis/v9"
Expand All @@ -25,6 +28,7 @@ type GoRedisPubSub struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Tracer trace.Tracer `inject:"tracer"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't expect to inject multiple different tracers, we don't need to name them. You could remove the name here, and in main where it's created.

client redis.UniversalClient
subs []*GoRedisSubscription
mut sync.RWMutex
Expand All @@ -36,7 +40,7 @@ var _ PubSub = (*GoRedisPubSub)(nil)
type GoRedisSubscription struct {
topic string
pubsub *redis.PubSub
cb func(msg string)
cb SubscriptionCallback
done chan struct{}
once sync.Once
}
Expand Down Expand Up @@ -111,6 +115,13 @@ func (ps *GoRedisPubSub) Close() {
}

func (ps *GoRedisPubSub) Publish(ctx context.Context, topic, message string) error {
ctx, span := otelutil.StartSpanMulti(ctx, ps.Tracer, "GoRedisPubSub.Publish", map[string]interface{}{
"topic": topic,
"message": message,
})

defer span.End()

ps.Metrics.Count("redis_pubsub_published", 1)
return ps.client.Publish(ctx, topic, message).Err()
}
Expand All @@ -119,7 +130,10 @@ func (ps *GoRedisPubSub) Publish(ctx context.Context, topic, message string) err
// whenever a message is received on that topic.
// Note that the same topic is Subscribed to multiple times, this will incur a separate
// connection to Redis for each Subscription.
func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string, callback func(string)) Subscription {
func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string, callback SubscriptionCallback) Subscription {
ctx, span := otelutil.StartSpanWith(ctx, ps.Tracer, "GoRedisPubSub.Subscribe", "topic", topic)
defer span.End()

sub := &GoRedisSubscription{
topic: topic,
pubsub: ps.client.Subscribe(ctx, topic),
Expand All @@ -130,6 +144,7 @@ func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string, callback f
ps.subs = append(ps.subs, sub)
ps.mut.Unlock()
go func() {
receiveRootCtx := context.Background()
redisch := sub.pubsub.Channel()
for {
select {
Expand All @@ -139,8 +154,18 @@ func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string, callback f
if msg == nil {
continue
}
receiveCtx, span := otelutil.StartSpanMulti(receiveRootCtx, ps.Tracer, "GoRedisPubSub.Receive", map[string]interface{}{
"topic": topic,
"message_queue_size": len(redisch),
"message": msg.Payload,
})
ps.Metrics.Count("redis_pubsub_received", 1)
go sub.cb(msg.Payload)

go func(cbCtx context.Context, span trace.Span) {
defer span.End()

sub.cb(cbCtx, msg.Payload)
}(receiveCtx, span)
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
}
}
}()
Expand Down
6 changes: 3 additions & 3 deletions pubsub/pubsub_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var _ PubSub = (*LocalPubSub)(nil)
type LocalSubscription struct {
ps *LocalPubSub
topic string
cb func(string)
cb SubscriptionCallback
mut sync.RWMutex
}

Expand Down Expand Up @@ -71,13 +71,13 @@ func (ps *LocalPubSub) Publish(ctx context.Context, topic, message string) error
for _, sub := range ps.topics[topic] {
// don't wait around for slow consumers
if sub.cb != nil {
go sub.cb(message)
go sub.cb(ctx, message)
}
}
return nil
}

func (ps *LocalPubSub) Subscribe(ctx context.Context, topic string, callback func(msg string)) Subscription {
func (ps *LocalPubSub) Subscribe(ctx context.Context, topic string, callback SubscriptionCallback) Subscription {
ps.mut.Lock()
ps.ensureTopic(topic)
sub := &LocalSubscription{ps: ps, topic: topic, cb: callback}
Expand Down
7 changes: 5 additions & 2 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/honeycombio/refinery/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace/noop"
)

var types = []string{
Expand All @@ -23,10 +24,12 @@ func newPubSub(typ string) pubsub.PubSub {
var ps pubsub.PubSub
m := &metrics.NullMetrics{}
m.Start()
tracer := noop.NewTracerProvider().Tracer("test")
switch typ {
case "goredis":
ps = &pubsub.GoRedisPubSub{
Metrics: m,
Tracer: tracer,
}
case "local":
ps = &pubsub.LocalPubSub{
Expand All @@ -44,7 +47,7 @@ type pubsubListener struct {
msgs []string
}

func (l *pubsubListener) Listen(msg string) {
func (l *pubsubListener) Listen(ctx context.Context, msg string) {
l.lock.Lock()
defer l.lock.Unlock()
l.msgs = append(l.msgs, msg)
Expand Down Expand Up @@ -196,7 +199,7 @@ func TestPubSubLatency(t *testing.T) {
wg.Done()
}()

ps.Subscribe(ctx, "topic", func(msg string) {
ps.Subscribe(ctx, "topic", func(ctx context.Context, msg string) {
sent, err := strconv.Atoi(msg)
require.NoError(t, err)
rcvd := time.Now().UnixNano()
Expand Down
Loading