diff --git a/internal/peer/pubsub_redis.go b/internal/peer/pubsub_redis.go index 0ba9c6d597..a9ecfa92c6 100644 --- a/internal/peer/pubsub_redis.go +++ b/internal/peer/pubsub_redis.go @@ -13,10 +13,10 @@ import ( "github.com/dgryski/go-wyhash" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/generics" + "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" ) const ( @@ -72,6 +72,7 @@ func (p *peerCommand) marshal() string { type RedisPubsubPeers struct { Config config.Config `inject:""` Metrics metrics.Metrics `inject:"metrics"` + Logger logger.Logger `inject:""` PubSub pubsub.PubSub `inject:""` Clock clockwork.Clock `inject:""` @@ -115,6 +116,13 @@ func (p *RedisPubsubPeers) Start() error { if p.PubSub == nil { return errors.New("injected pubsub is nil") } + // if we didn't get an injected logger or metrics, use the null ones (for tests) + if p.Metrics == nil { + p.Metrics = &metrics.NullMetrics{} + } + if p.Logger == nil { + p.Logger = &logger.NullLogger{} + } p.done = make(chan struct{}) p.peers = generics.NewSetWithTTL[string](peerEntryTimeout) @@ -125,7 +133,7 @@ func (p *RedisPubsubPeers) Start() error { p.Metrics.Register("peer_hash", "gauge") p.Metrics.Register("peer_messages", "counter") - myaddr, err := publicAddr(p.Config) + myaddr, err := p.publicAddr() if err != nil { return err } @@ -137,6 +145,12 @@ func (p *RedisPubsubPeers) Start() error { // to 20% of the interval interval := refreshCacheInterval + time.Duration(rand.Int63n(int64(refreshCacheInterval/5))) ticker := p.Clock.NewTicker(interval) + defer ticker.Stop() + + // every 25-35 seconds, log the current state of the peers + // (we could make this configurable if we wanted but it's not that important) + logTicker := p.Clock.NewTicker((time.Duration(rand.Intn(10000))*time.Millisecond + (25 * time.Second))) + defer logTicker.Stop() for { select { case <-p.done: @@ -146,6 +160,13 @@ func (p *RedisPubsubPeers) Start() error { ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout()) p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr).marshal()) cancel() + case <-logTicker.Chan(): + p.Logger.Debug().WithFields(map[string]any{ + "peers": p.peers.Members(), + "hash": p.hash, + "num_peers": len(p.peers.Members()), + "self": myaddr, + }).Logf("peer report") } } }() @@ -155,7 +176,7 @@ func (p *RedisPubsubPeers) Start() error { func (p *RedisPubsubPeers) Stop() error { // unregister ourselves - myaddr, err := publicAddr(p.Config) + myaddr, err := p.publicAddr() if err != nil { return err } @@ -179,9 +200,9 @@ func (p *RedisPubsubPeers) RegisterUpdatedPeersCallback(callback func()) { p.callbacks = append(p.callbacks, callback) } -func publicAddr(c config.Config) (string, error) { +func (p *RedisPubsubPeers) publicAddr() (string, error) { // compute the public version of my peer listen address - listenAddr, _ := c.GetPeerListenAddr() + listenAddr, _ := p.Config.GetPeerListenAddr() // first, extract the port _, port, err := net.SplitHostPort(listenAddr) @@ -192,12 +213,12 @@ func publicAddr(c config.Config) (string, error) { var myIdentifier string // If RedisIdentifier is set, use as identifier. - if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" { + if redisIdentifier, _ := p.Config.GetRedisIdentifier(); redisIdentifier != "" { myIdentifier = redisIdentifier - logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config") + p.Logger.Info().WithField("identifier", myIdentifier).Logf("using specified RedisIdentifier from config") } else { // Otherwise, determine identifier from network interface. - myIdentifier, err = getIdentifierFromInterface(c) + myIdentifier, err = p.getIdentifierFromInterface() if err != nil { return "", err } @@ -212,28 +233,28 @@ func publicAddr(c config.Config) (string, error) { // host in the network. If an interface is specified, it will scan it to // determine an identifier from the first IP address on that interface. // Otherwise, it will use the hostname. -func getIdentifierFromInterface(c config.Config) (string, error) { +func (p *RedisPubsubPeers) getIdentifierFromInterface() (string, error) { myIdentifier, _ := os.Hostname() - identifierInterfaceName, _ := c.GetIdentifierInterfaceName() + identifierInterfaceName, _ := p.Config.GetIdentifierInterfaceName() if identifierInterfaceName != "" { ifc, err := net.InterfaceByName(identifierInterfaceName) if err != nil { - logrus.WithError(err).WithField("interface", identifierInterfaceName). - Error("IdentifierInterfaceName set but couldn't find interface by that name") + p.Logger.Error().WithField("interface", identifierInterfaceName). + Logf("IdentifierInterfaceName set but couldn't find interface by that name") return "", err } addrs, err := ifc.Addrs() if err != nil { - logrus.WithError(err).WithField("interface", identifierInterfaceName). - Error("IdentifierInterfaceName set but couldn't list addresses") + p.Logger.Error().WithField("interface", identifierInterfaceName). + Logf("IdentifierInterfaceName set but couldn't list addresses") return "", err } var ipStr string for _, addr := range addrs { // ParseIP doesn't know what to do with the suffix ip := net.ParseIP(strings.Split(addr.String(), "/")[0]) - ipv6, _ := c.GetUseIPV6Identifier() + ipv6, _ := p.Config.GetUseIPV6Identifier() if ipv6 && ip.To16() != nil { ipStr = fmt.Sprintf("[%s]", ip.String()) break @@ -245,11 +266,13 @@ func getIdentifierFromInterface(c config.Config) (string, error) { } if ipStr == "" { err = errors.New("could not find a valid IP to use from interface") - logrus.WithField("interface", ifc.Name).WithError(err) + p.Logger.Error().WithField("interface", ifc.Name). + Logf("IdentifierInterfaceName set but couldn't find a valid IP to use from interface") return "", err } myIdentifier = ipStr - logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface") + p.Logger.Info().WithField("identifier", myIdentifier).WithField("interface", ifc.Name). + Logf("using identifier from interface") } return myIdentifier, nil diff --git a/internal/peer/pubsub_test.go b/internal/peer/pubsub_test.go index 2c1d549202..d0b02cb0b0 100644 --- a/internal/peer/pubsub_test.go +++ b/internal/peer/pubsub_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/logger" "github.com/stretchr/testify/assert" ) @@ -23,7 +24,11 @@ func Test_publicAddr(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := publicAddr(tt.c) + peers := &RedisPubsubPeers{ + Config: tt.c, + Logger: &logger.NullLogger{}, + } + got, err := peers.publicAddr() if (err != nil) != tt.wantErr { t.Errorf("publicAddr() error = %v, wantErr %v", err, tt.wantErr) return