Skip to content

Commit

Permalink
feat: Add metrics to pubsub and peers (#1226)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Adds metrics to general pubsub
- Adds metrics to peer tracking
  • Loading branch information
kentquirk authored Jul 17, 2024
1 parent e4c23d2 commit 8f61ee8
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 27 deletions.
10 changes: 6 additions & 4 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func newStartedApp(

var err error
if peers == nil {
peers = &peer.FilePeers{Cfg: c}
peers = &peer.FilePeers{Cfg: c, Metrics: &metrics.NullMetrics{}}
}

a := App{}
Expand Down Expand Up @@ -273,9 +273,11 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) {
resp.Body.Close()

// Wait for span to be sent.
time.Sleep(2 * a.Config.GetSendTickerValue())
events := sender.Events()
require.Len(t, events, 1)
var events []*transmission.Event
require.Eventually(t, func() bool {
events = sender.Events()
return len(events) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(t, "dataset", events[0].Dataset)
assert.Equal(t, "bar", events[0].Data["foo"])
Expand Down
10 changes: 8 additions & 2 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package peer

import "github.com/honeycombio/refinery/config"
import (
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/metrics"
)

type FilePeers struct {
Cfg config.Config `inject:""`
Cfg config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
}

func (p *FilePeers) GetPeers() ([]string, error) {
Expand All @@ -14,6 +18,7 @@ func (p *FilePeers) GetPeers() ([]string, error) {
if len(peers) == 0 {
peers = []string{"http://127.0.0.1:8081"}
}
p.Metrics.Gauge("num_file_peers", float64(len(peers)))
return peers, err
}

Expand All @@ -24,6 +29,7 @@ func (p *FilePeers) RegisterUpdatedPeersCallback(callback func()) {
}

func (p *FilePeers) Start() error {
p.Metrics.Register("num_file_peers", "gauge")
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion internal/peer/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/facebookgo/startstop"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
Expand All @@ -29,7 +30,8 @@ func newPeers(c config.Config) (Peers, error) {
switch ptype {
case "file":
peers = &FilePeers{
Cfg: c,
Cfg: c,
Metrics: &metrics.NullMetrics{},
}
// we know FilePeers doesn't need to be Started, so as long as we gave it a Cfg above,
// we can ask it how many peers we have.
Expand Down Expand Up @@ -57,6 +59,7 @@ func newPeers(c config.Config) (Peers, error) {
{Value: c},
{Value: peers},
{Value: pubsubber},
{Value: &metrics.NullMetrics{}, Name: "metrics"},
{Value: &logger.NullLogger{}},
{Value: clockwork.NewFakeClock()},
}
Expand Down
15 changes: 12 additions & 3 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dgryski/go-wyhash"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/generics"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -72,9 +73,10 @@ func (p *peerCommand) marshal() string {
}

type RedisPubsubPeers struct {
Config config.Config `inject:""`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`

peers *generics.SetWithTTL[string]
hash uint64
Expand All @@ -93,13 +95,16 @@ func (p *RedisPubsubPeers) checkHash() {
go cb()
}
}
p.Metrics.Gauge("num_peers", float64(len(peers)))
p.Metrics.Gauge("peer_hash", float64(p.hash))
}

func (p *RedisPubsubPeers) listen(msg string) {
cmd := &peerCommand{}
if !cmd.unmarshal(msg) {
return
}
p.Metrics.Count("peer_messages", 1)
switch cmd.action {
case Unregister:
p.peers.Remove(cmd.peer)
Expand All @@ -119,6 +124,10 @@ func (p *RedisPubsubPeers) Start() error {
p.callbacks = make([]func(), 0)
p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen)

p.Metrics.Register("num_peers", "gauge")
p.Metrics.Register("peer_hash", "gauge")
p.Metrics.Register("peer_messages", "counter")

myaddr, err := publicAddr(p.Config)
if err != nil {
return err
Expand Down
17 changes: 12 additions & 5 deletions pubsub/pubsub_goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/redis/go-redis/v9"
)

Expand All @@ -21,11 +22,12 @@ import (
// GoRedisPubSub is a PubSub implementation that uses Redis as the message broker
// and the go-redis library to interact with Redis.
type GoRedisPubSub struct {
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
client redis.UniversalClient
subs []*GoRedisSubscription
mut sync.RWMutex
Config config.Config `inject:""`
Logger logger.Logger `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
client redis.UniversalClient
subs []*GoRedisSubscription
mut sync.RWMutex
}

// Ensure that GoRedisPubSub implements PubSub
Expand All @@ -46,6 +48,9 @@ func (ps *GoRedisPubSub) Start() error {
options := &redis.UniversalOptions{}
authcode := ""

ps.Metrics.Register("redis_pubsub_published", "counter")
ps.Metrics.Register("redis_pubsub_received", "counter")

if ps.Config != nil {
host, err := ps.Config.GetRedisHost()
if err != nil {
Expand Down Expand Up @@ -106,6 +111,7 @@ func (ps *GoRedisPubSub) Close() {
}

func (ps *GoRedisPubSub) Publish(ctx context.Context, topic, message string) error {
ps.Metrics.Count("redis_pubsub_published", 1)
return ps.client.Publish(ctx, topic, message).Err()
}

Expand Down Expand Up @@ -133,6 +139,7 @@ func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string, callback f
if msg == nil {
continue
}
ps.Metrics.Count("redis_pubsub_received", 1)
go sub.cb(msg.Payload)
}
}
Expand Down
12 changes: 9 additions & 3 deletions pubsub/pubsub_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import (
"sync"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/metrics"
)

// LocalPubSub is a PubSub implementation that uses local channels to send messages; it does
// not communicate with any external processes.
// subs are individual channels for each subscription
type LocalPubSub struct {
Config config.Config `inject:""`
topics map[string][]*LocalSubscription
mut sync.RWMutex
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
topics map[string][]*LocalSubscription
mut sync.RWMutex
}

// Ensure that LocalPubSub implements PubSub
Expand All @@ -32,6 +34,8 @@ var _ Subscription = (*LocalSubscription)(nil)
// Start initializes the LocalPubSub
func (ps *LocalPubSub) Start() error {
ps.topics = make(map[string][]*LocalSubscription)
ps.Metrics.Register("local_pubsub_published", "counter")
ps.Metrics.Register("local_pubsub_received", "counter")
return nil
}

Expand Down Expand Up @@ -62,6 +66,8 @@ func (ps *LocalPubSub) Publish(ctx context.Context, topic, message string) error
ps.mut.Lock()
defer ps.mut.Unlock()
ps.ensureTopic(topic)
ps.Metrics.Count("local_pubsub_published", 1)
ps.Metrics.Count("local_pubsub_received", len(ps.topics[topic]))
for _, sub := range ps.topics[topic] {
// don't wait around for slow consumers
if sub.cb != nil {
Expand Down
11 changes: 9 additions & 2 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -20,11 +21,17 @@ var types = []string{

func newPubSub(typ string) pubsub.PubSub {
var ps pubsub.PubSub
m := &metrics.NullMetrics{}
m.Start()
switch typ {
case "goredis":
ps = &pubsub.GoRedisPubSub{}
ps = &pubsub.GoRedisPubSub{
Metrics: m,
}
case "local":
ps = &pubsub.LocalPubSub{}
ps = &pubsub.LocalPubSub{
Metrics: m,
}
default:
panic("unknown pubsub type")
}
Expand Down
15 changes: 8 additions & 7 deletions sharder/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/stretchr/testify/assert"
)

Expand All @@ -29,7 +30,7 @@ func TestWhichShard(t *testing.T) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestWhichShardAtEdge(t *testing.T) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down Expand Up @@ -122,7 +123,7 @@ func BenchmarkShardBulk(b *testing.B) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestShardBulk(t *testing.T) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestShardDrop(t *testing.T) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down Expand Up @@ -323,7 +324,7 @@ func TestShardAddHash(t *testing.T) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down Expand Up @@ -404,7 +405,7 @@ func BenchmarkDeterministicShard(b *testing.B) {
}
done := make(chan struct{})
defer close(done)
filePeers := &peer.FilePeers{Cfg: config}
filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
sharder := DeterministicSharder{
Config: config,
Logger: &logger.NullLogger{},
Expand Down

0 comments on commit 8f61ee8

Please sign in to comment.