From 95f880a19c6797e6739e7946f4470c7ede1078c8 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Tue, 16 Jul 2024 19:18:23 -0400 Subject: [PATCH 1/3] Add metrics to pubsub --- internal/peer/file.go | 10 ++++++++-- internal/peer/peers_test.go | 2 ++ internal/peer/pubsub_redis.go | 15 ++++++++++++--- pubsub/pubsub_goredis.go | 17 ++++++++++++----- pubsub/pubsub_local.go | 12 +++++++++--- pubsub/pubsub_test.go | 11 +++++++++-- 6 files changed, 52 insertions(+), 15 deletions(-) diff --git a/internal/peer/file.go b/internal/peer/file.go index 5ec68859cb..9c5f1399b5 100644 --- a/internal/peer/file.go +++ b/internal/peer/file.go @@ -1,9 +1,13 @@ package peer -import "github.com/honeycombio/refinery/config" +import ( + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/metrics" +) type FilePeers struct { - Cfg config.Config `inject:""` + Cfg config.Config `inject:""` + Metrics metrics.Metrics `inject:"metrics"` } func (p *FilePeers) GetPeers() ([]string, error) { @@ -14,6 +18,7 @@ func (p *FilePeers) GetPeers() ([]string, error) { if len(peers) == 0 { peers = []string{"http://127.0.0.1:8081"} } + p.Metrics.Gauge("num_file_peers", float64(len(peers))) return peers, err } @@ -24,6 +29,7 @@ func (p *FilePeers) RegisterUpdatedPeersCallback(callback func()) { } func (p *FilePeers) Start() error { + p.Metrics.Register("num_file_peers", "gauge") return nil } diff --git a/internal/peer/peers_test.go b/internal/peer/peers_test.go index 98a2c6d4f3..fe20f7a61f 100644 --- a/internal/peer/peers_test.go +++ b/internal/peer/peers_test.go @@ -12,6 +12,7 @@ import ( "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" @@ -57,6 +58,7 @@ func newPeers(c config.Config) (Peers, error) { {Value: c}, {Value: peers}, {Value: pubsubber}, + {Value: &metrics.NullMetrics{}, Name: "metrics"}, {Value: &logger.NullLogger{}}, {Value: clockwork.NewFakeClock()}, } diff --git a/internal/peer/pubsub_redis.go b/internal/peer/pubsub_redis.go index 033751c8ed..fe6976e12e 100644 --- a/internal/peer/pubsub_redis.go +++ b/internal/peer/pubsub_redis.go @@ -12,6 +12,7 @@ import ( "github.com/dgryski/go-wyhash" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/generics" + "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" @@ -72,9 +73,10 @@ func (p *peerCommand) marshal() string { } type RedisPubsubPeers struct { - Config config.Config `inject:""` - PubSub pubsub.PubSub `inject:""` - Clock clockwork.Clock `inject:""` + Config config.Config `inject:""` + Metrics metrics.Metrics `inject:"metrics"` + PubSub pubsub.PubSub `inject:""` + Clock clockwork.Clock `inject:""` peers *generics.SetWithTTL[string] hash uint64 @@ -93,6 +95,8 @@ func (p *RedisPubsubPeers) checkHash() { go cb() } } + p.Metrics.Gauge("num_peers", float64(len(peers))) + p.Metrics.Gauge("peer_hash", float64(p.hash)) } func (p *RedisPubsubPeers) listen(msg string) { @@ -100,6 +104,7 @@ func (p *RedisPubsubPeers) listen(msg string) { if !cmd.unmarshal(msg) { return } + p.Metrics.Count("peer_messages", 1) switch cmd.action { case Unregister: p.peers.Remove(cmd.peer) @@ -119,6 +124,10 @@ func (p *RedisPubsubPeers) Start() error { p.callbacks = make([]func(), 0) p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen) + p.Metrics.Register("num_peers", "gauge") + p.Metrics.Register("peer_hash", "gauge") + p.Metrics.Register("peer_messages", "counter") + myaddr, err := publicAddr(p.Config) if err != nil { return err diff --git a/pubsub/pubsub_goredis.go b/pubsub/pubsub_goredis.go index c9014b73cd..331f468454 100644 --- a/pubsub/pubsub_goredis.go +++ b/pubsub/pubsub_goredis.go @@ -7,6 +7,7 @@ import ( "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" "github.com/redis/go-redis/v9" ) @@ -21,11 +22,12 @@ import ( // GoRedisPubSub is a PubSub implementation that uses Redis as the message broker // and the go-redis library to interact with Redis. type GoRedisPubSub struct { - Config config.Config `inject:""` - Logger logger.Logger `inject:""` - client redis.UniversalClient - subs []*GoRedisSubscription - mut sync.RWMutex + Config config.Config `inject:""` + Logger logger.Logger `inject:""` + Metrics metrics.Metrics `inject:"metrics"` + client redis.UniversalClient + subs []*GoRedisSubscription + mut sync.RWMutex } // Ensure that GoRedisPubSub implements PubSub @@ -46,6 +48,9 @@ func (ps *GoRedisPubSub) Start() error { options := &redis.UniversalOptions{} authcode := "" + ps.Metrics.Register("redis_pubsub_published", "counter") + ps.Metrics.Register("redis_pubsub_received", "counter") + if ps.Config != nil { host, err := ps.Config.GetRedisHost() if err != nil { @@ -106,6 +111,7 @@ func (ps *GoRedisPubSub) Close() { } func (ps *GoRedisPubSub) Publish(ctx context.Context, topic, message string) error { + ps.Metrics.Count("redis_pubsub_published", 1) return ps.client.Publish(ctx, topic, message).Err() } @@ -133,6 +139,7 @@ func (ps *GoRedisPubSub) Subscribe(ctx context.Context, topic string, callback f if msg == nil { continue } + ps.Metrics.Count("redis_pubsub_received", 1) go sub.cb(msg.Payload) } } diff --git a/pubsub/pubsub_local.go b/pubsub/pubsub_local.go index 2472a66678..8b7b985e86 100644 --- a/pubsub/pubsub_local.go +++ b/pubsub/pubsub_local.go @@ -5,15 +5,17 @@ import ( "sync" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/metrics" ) // LocalPubSub is a PubSub implementation that uses local channels to send messages; it does // not communicate with any external processes. // subs are individual channels for each subscription type LocalPubSub struct { - Config config.Config `inject:""` - topics map[string][]*LocalSubscription - mut sync.RWMutex + Config config.Config `inject:""` + Metrics metrics.Metrics `inject:"metrics"` + topics map[string][]*LocalSubscription + mut sync.RWMutex } // Ensure that LocalPubSub implements PubSub @@ -32,6 +34,8 @@ var _ Subscription = (*LocalSubscription)(nil) // Start initializes the LocalPubSub func (ps *LocalPubSub) Start() error { ps.topics = make(map[string][]*LocalSubscription) + ps.Metrics.Register("local_pubsub_published", "counter") + ps.Metrics.Register("local_pubsub_received", "counter") return nil } @@ -62,6 +66,8 @@ func (ps *LocalPubSub) Publish(ctx context.Context, topic, message string) error ps.mut.Lock() defer ps.mut.Unlock() ps.ensureTopic(topic) + ps.Metrics.Count("local_pubsub_published", 1) + ps.Metrics.Count("local_pubsub_received", len(ps.topics[topic])) for _, sub := range ps.topics[topic] { // don't wait around for slow consumers if sub.cb != nil { diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index abaa60be8c..1809b3f64f 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -20,11 +21,17 @@ var types = []string{ func newPubSub(typ string) pubsub.PubSub { var ps pubsub.PubSub + m := &metrics.NullMetrics{} + m.Start() switch typ { case "goredis": - ps = &pubsub.GoRedisPubSub{} + ps = &pubsub.GoRedisPubSub{ + Metrics: m, + } case "local": - ps = &pubsub.LocalPubSub{} + ps = &pubsub.LocalPubSub{ + Metrics: m, + } default: panic("unknown pubsub type") } From 2865f6ab9285af90afab3f3c805d405b9cdd4dfc Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Wed, 17 Jul 2024 11:48:26 -0400 Subject: [PATCH 2/3] Fix some tests --- app/app_test.go | 2 +- sharder/deterministic_test.go | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index 4a109664fd..ddf3763670 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -115,7 +115,7 @@ func newStartedApp( var err error if peers == nil { - peers = &peer.FilePeers{Cfg: c} + peers = &peer.FilePeers{Cfg: c, Metrics: &metrics.NullMetrics{}} } a := App{} diff --git a/sharder/deterministic_test.go b/sharder/deterministic_test.go index d4fc3b6a81..73ea093fd4 100644 --- a/sharder/deterministic_test.go +++ b/sharder/deterministic_test.go @@ -8,6 +8,7 @@ import ( "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" "github.com/stretchr/testify/assert" ) @@ -29,7 +30,7 @@ func TestWhichShard(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -71,7 +72,7 @@ func TestWhichShardAtEdge(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -122,7 +123,7 @@ func BenchmarkShardBulk(b *testing.B) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -168,7 +169,7 @@ func TestShardBulk(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -240,7 +241,7 @@ func TestShardDrop(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -323,7 +324,7 @@ func TestShardAddHash(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -404,7 +405,7 @@ func BenchmarkDeterministicShard(b *testing.B) { } done := make(chan struct{}) defer close(done) - filePeers := &peer.FilePeers{Cfg: config} + filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, From 371ce0695bc093207b38bfd2047b4465f6f8b542 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Wed, 17 Jul 2024 13:36:09 -0400 Subject: [PATCH 3/3] more test fixes --- app/app_test.go | 8 +++++--- internal/peer/peers_test.go | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index ddf3763670..5c64272eb5 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -273,9 +273,11 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) { resp.Body.Close() // Wait for span to be sent. - time.Sleep(2 * a.Config.GetSendTickerValue()) - events := sender.Events() - require.Len(t, events, 1) + var events []*transmission.Event + require.Eventually(t, func() bool { + events = sender.Events() + return len(events) == 1 + }, 2*time.Second, 2*time.Millisecond) assert.Equal(t, "dataset", events[0].Dataset) assert.Equal(t, "bar", events[0].Data["foo"]) diff --git a/internal/peer/peers_test.go b/internal/peer/peers_test.go index fe20f7a61f..fbeeca5953 100644 --- a/internal/peer/peers_test.go +++ b/internal/peer/peers_test.go @@ -30,7 +30,8 @@ func newPeers(c config.Config) (Peers, error) { switch ptype { case "file": peers = &FilePeers{ - Cfg: c, + Cfg: c, + Metrics: &metrics.NullMetrics{}, } // we know FilePeers doesn't need to be Started, so as long as we gave it a Cfg above, // we can ask it how many peers we have.