Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add peer logging, add debug log of peers #1239

Merged
merged 3 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (
"github.com/dgryski/go-wyhash"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/generics"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -72,6 +72,7 @@ func (p *peerCommand) marshal() string {
type RedisPubsubPeers struct {
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Logger logger.Logger `inject:""`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`

Expand Down Expand Up @@ -115,6 +116,13 @@ func (p *RedisPubsubPeers) Start() error {
if p.PubSub == nil {
return errors.New("injected pubsub is nil")
}
// if we didn't get an injected logger or metrics, use the null ones (for tests)
if p.Metrics == nil {
p.Metrics = &metrics.NullMetrics{}
}
if p.Logger == nil {
p.Logger = &logger.NullLogger{}
}

p.done = make(chan struct{})
p.peers = generics.NewSetWithTTL[string](peerEntryTimeout)
Expand All @@ -125,7 +133,7 @@ func (p *RedisPubsubPeers) Start() error {
p.Metrics.Register("peer_hash", "gauge")
p.Metrics.Register("peer_messages", "counter")

myaddr, err := publicAddr(p.Config)
myaddr, err := p.publicAddr()
if err != nil {
return err
}
Expand All @@ -137,6 +145,12 @@ func (p *RedisPubsubPeers) Start() error {
// to 20% of the interval
interval := refreshCacheInterval + time.Duration(rand.Int63n(int64(refreshCacheInterval/5)))
ticker := p.Clock.NewTicker(interval)
defer ticker.Stop()

// every 25-35 seconds, log the current state of the peers
// (we could make this configurable if we wanted but it's not that important)
logTicker := p.Clock.NewTicker((time.Duration(rand.Intn(10000))*time.Millisecond + (25 * time.Second)))
defer logTicker.Stop()
for {
select {
case <-p.done:
Expand All @@ -146,6 +160,13 @@ func (p *RedisPubsubPeers) Start() error {
ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout())
p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr).marshal())
cancel()
case <-logTicker.Chan():
p.Logger.Debug().WithFields(map[string]any{
"peers": p.peers.Members(),
"hash": p.hash,
"num_peers": len(p.peers.Members()),
"self": myaddr,
}).Logf("peer report")
}
}
}()
Expand All @@ -155,7 +176,7 @@ func (p *RedisPubsubPeers) Start() error {

func (p *RedisPubsubPeers) Stop() error {
// unregister ourselves
myaddr, err := publicAddr(p.Config)
myaddr, err := p.publicAddr()
if err != nil {
return err
}
Expand All @@ -179,9 +200,9 @@ func (p *RedisPubsubPeers) RegisterUpdatedPeersCallback(callback func()) {
p.callbacks = append(p.callbacks, callback)
}

func publicAddr(c config.Config) (string, error) {
func (p *RedisPubsubPeers) publicAddr() (string, error) {
// compute the public version of my peer listen address
listenAddr, _ := c.GetPeerListenAddr()
listenAddr, _ := p.Config.GetPeerListenAddr()
// first, extract the port
_, port, err := net.SplitHostPort(listenAddr)

Expand All @@ -192,12 +213,12 @@ func publicAddr(c config.Config) (string, error) {
var myIdentifier string

// If RedisIdentifier is set, use as identifier.
if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" {
if redisIdentifier, _ := p.Config.GetRedisIdentifier(); redisIdentifier != "" {
myIdentifier = redisIdentifier
logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config")
p.Logger.Info().WithField("identifier", myIdentifier).Logf("using specified RedisIdentifier from config")
} else {
// Otherwise, determine identifier from network interface.
myIdentifier, err = getIdentifierFromInterface(c)
myIdentifier, err = p.getIdentifierFromInterface()
if err != nil {
return "", err
}
Expand All @@ -212,28 +233,28 @@ func publicAddr(c config.Config) (string, error) {
// host in the network. If an interface is specified, it will scan it to
// determine an identifier from the first IP address on that interface.
// Otherwise, it will use the hostname.
func getIdentifierFromInterface(c config.Config) (string, error) {
func (p *RedisPubsubPeers) getIdentifierFromInterface() (string, error) {
myIdentifier, _ := os.Hostname()
identifierInterfaceName, _ := c.GetIdentifierInterfaceName()
identifierInterfaceName, _ := p.Config.GetIdentifierInterfaceName()

if identifierInterfaceName != "" {
ifc, err := net.InterfaceByName(identifierInterfaceName)
if err != nil {
logrus.WithError(err).WithField("interface", identifierInterfaceName).
Error("IdentifierInterfaceName set but couldn't find interface by that name")
p.Logger.Error().WithField("interface", identifierInterfaceName).
Logf("IdentifierInterfaceName set but couldn't find interface by that name")
return "", err
}
addrs, err := ifc.Addrs()
if err != nil {
logrus.WithError(err).WithField("interface", identifierInterfaceName).
Error("IdentifierInterfaceName set but couldn't list addresses")
p.Logger.Error().WithField("interface", identifierInterfaceName).
Logf("IdentifierInterfaceName set but couldn't list addresses")
return "", err
}
var ipStr string
for _, addr := range addrs {
// ParseIP doesn't know what to do with the suffix
ip := net.ParseIP(strings.Split(addr.String(), "/")[0])
ipv6, _ := c.GetUseIPV6Identifier()
ipv6, _ := p.Config.GetUseIPV6Identifier()
if ipv6 && ip.To16() != nil {
ipStr = fmt.Sprintf("[%s]", ip.String())
break
Expand All @@ -245,11 +266,13 @@ func getIdentifierFromInterface(c config.Config) (string, error) {
}
if ipStr == "" {
err = errors.New("could not find a valid IP to use from interface")
logrus.WithField("interface", ifc.Name).WithError(err)
p.Logger.Error().WithField("interface", ifc.Name).
Logf("IdentifierInterfaceName set but couldn't find a valid IP to use from interface")
return "", err
}
myIdentifier = ipStr
logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface")
p.Logger.Info().WithField("identifier", myIdentifier).WithField("interface", ifc.Name).
Logf("using identifier from interface")
}

return myIdentifier, nil
Expand Down
7 changes: 6 additions & 1 deletion internal/peer/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/stretchr/testify/assert"
)

Expand All @@ -23,7 +24,11 @@ func Test_publicAddr(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := publicAddr(tt.c)
peers := &RedisPubsubPeers{
Config: tt.c,
Logger: &logger.NullLogger{},
}
got, err := peers.publicAddr()
if (err != nil) != tt.wantErr {
t.Errorf("publicAddr() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading