From b1cdfae3921b6c30ff8e692921e4da9550d27903 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Fri, 21 Jun 2024 18:48:18 -0400 Subject: [PATCH 1/3] Update to use new pubsub api --- internal/peer/file.go | 8 ++ internal/peer/peers.go | 7 +- internal/peer/peers_test.go | 28 +++-- internal/peer/pubsub.go | 198 ++++++++++++++++++++++++++++++++++++ internal/peer/redis.go | 91 ++--------------- 5 files changed, 243 insertions(+), 89 deletions(-) create mode 100644 internal/peer/pubsub.go diff --git a/internal/peer/file.go b/internal/peer/file.go index b47228a585..d35db960a1 100644 --- a/internal/peer/file.go +++ b/internal/peer/file.go @@ -29,3 +29,11 @@ func (p *filePeers) RegisterUpdatedPeersCallback(callback func()) { // otherwise do nothing since they never change callback() } + +func (p *filePeers) Start() error { + return nil +} + +func (p *filePeers) Stop() error { + return nil +} diff --git a/internal/peer/peers.go b/internal/peer/peers.go index ed84ab7764..e131b7b71a 100644 --- a/internal/peer/peers.go +++ b/internal/peer/peers.go @@ -4,14 +4,17 @@ import ( "context" "errors" + "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" ) // Peers holds the collection of peers for the cluster type Peers interface { GetPeers() ([]string, error) - RegisterUpdatedPeersCallback(callback func()) + // make it injectable + startstop.Starter + startstop.Stopper } func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) { @@ -26,6 +29,8 @@ func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, return newFilePeers(c), nil case "redis": return newRedisPeers(ctx, c, done) + case "pubsub": + return newPubsubPeers(c) default: return nil, errors.New("invalid config option 'PeerManagement.Type'") } diff --git a/internal/peer/peers_test.go b/internal/peer/peers_test.go index e481d0796c..feee62875b 100644 --- a/internal/peer/peers_test.go +++ b/internal/peer/peers_test.go @@ -7,11 +7,12 @@ import ( "time" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/pubsub" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestNewPeers(t *testing.T) { +func TestNewPeersFile(t *testing.T) { c := &config.MockConfig{ PeerManagementType: "file", PeerTimeout: 5 * time.Second, @@ -30,21 +31,32 @@ func TestNewPeers(t *testing.T) { default: t.Errorf("received %T expected %T", i, &filePeers{}) } +} - c = &config.MockConfig{ - GetPeerListenAddrVal: "0.0.0.0:8081", - PeerManagementType: "redis", - PeerTimeout: 5 * time.Second, +func TestNewPeersPubSub(t *testing.T) { + c := &config.MockConfig{ + GetPeerListenAddrVal: "0.0.0.0:8081", + PeerManagementType: "pubsub", + PeerTimeout: 5 * time.Second, + IdentifierInterfaceName: "eth0", } - p, err = NewPeers(context.Background(), c, done) + pubsub := pubsub.LocalPubSub{} + pubsub.Start() + + done := make(chan struct{}) + defer close(done) + p, err := NewPeers(context.Background(), c, done) assert.NoError(t, err) require.NotNil(t, p) + p.(*pubsubPeers).PubSub = &pubsub + p.Start() + defer p.Stop() switch i := p.(type) { - case *redisPeers: + case *pubsubPeers: default: - t.Errorf("received %T expected %T", i, &redisPeers{}) + t.Errorf("received %T expected %T", i, &pubsubPeers{}) } } diff --git a/internal/peer/pubsub.go b/internal/peer/pubsub.go new file mode 100644 index 0000000000..22819767f2 --- /dev/null +++ b/internal/peer/pubsub.go @@ -0,0 +1,198 @@ +package peer + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "strings" + "time" + + "github.com/dgryski/go-wyhash" + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/generics" + "github.com/honeycombio/refinery/pubsub" + "github.com/jonboulle/clockwork" + "github.com/sirupsen/logrus" +) + +const ( + // refreshCacheInterval is how frequently this host will re-register itself + // with Redis. This should happen about 3x during each timeout phase in order + // to allow multiple timeouts to fail and yet still keep the host in the mix. + // Falling out of Redis will result in re-hashing the host-trace affinity and + // will cause broken traces for those that fall on both sides of the rehashing. + // This is why it's important to ensure hosts stay in the pool. + refreshCacheInterval = 3 * time.Second + + // peerEntryTimeout is how long redis will wait before expiring a peer that + // doesn't check in. The ratio of refresh to peer timeout should be 1/3. Redis + // timeouts are in seconds and entries can last up to 2 seconds longer than + // their expected timeout (in my load testing), so the lower bound for this + // timer should be ... 5sec? + peerEntryTimeout = 10 * time.Second +) + +type pubsubPeers struct { + Config config.Config `inject:""` + PubSub pubsub.PubSub `inject:""` + Clock clockwork.Clock `inject:""` + + peers *generics.SetWithTTL[string] + hash uint64 + callbacks []func() + sub pubsub.Subscription + done chan struct{} +} + +// NewPubsubPeers returns a peers collection backed by a pubsub system. +// It expects members to publish their own presence to the pubsub system every so often +// as determined by refreshCacheInterval. If they fail to do after peerEntryTimeout, they +// will be removed from the list of peers. +// They can also remove themselves from the list of peers by publishing an "unregister" message. +// The register message is just "register address" and the unregister message is "unregister address". +func newPubsubPeers(c config.Config) (Peers, error) { + return &pubsubPeers{ + Config: c, + Clock: clockwork.NewRealClock(), + }, nil +} + +// checkHash checks the hash of the current list of peers and calls any registered callbacks +func (p *pubsubPeers) checkHash() { + peers := p.peers.Members() + newhash := hashList(peers) + if newhash != p.hash { + p.hash = newhash + for _, cb := range p.callbacks { + go cb() + } + } +} + +func (p *pubsubPeers) Start() error { + if p.PubSub == nil { + return errors.New("injected pubsub is nil") + } + + p.done = make(chan struct{}) + p.peers = generics.NewSetWithTTL[string](peerEntryTimeout) + p.callbacks = make([]func(), 0) + p.sub = p.PubSub.Subscribe(context.Background(), "peers") + + myname, err := getIdentifierFromInterfaces(p.Config) + if err != nil { + return err + } + + // periodically refresh our presence in the list of peers, and update peers as they come in + go func() { + ticker := p.Clock.NewTicker(refreshCacheInterval) + for { + select { + case <-p.done: + return + case <-ticker.Chan(): + // publish our presence periodically + ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout()) + p.PubSub.Publish(ctx, "peers", "register "+myname) + cancel() + case msg := <-p.sub.Channel(): + parts := strings.Split(msg, " ") + if len(parts) != 2 { + continue + } + action, peer := parts[0], parts[1] + switch action { + case "unregister": + p.peers.Remove(peer) + case "register": + p.peers.Add(peer) + } + p.checkHash() + } + } + }() + + return nil +} + +func (p *pubsubPeers) Stop() error { + // unregister ourselves + myname, err := getIdentifierFromInterfaces(p.Config) + if err != nil { + return err + } + p.PubSub.Publish(context.Background(), "peers", "unregister "+myname) + close(p.done) + return nil +} + +func (p *pubsubPeers) GetPeers() ([]string, error) { + // we never want to return an empty list of peers, so if the system returns + // an empty list, return a single peer (its name doesn't really matter). + // This keeps the sharding logic happy. + peers := p.peers.Members() + if len(peers) == 0 { + peers = []string{"http://127.0.0.1:8081"} + } + return peers, nil +} + +func (p *pubsubPeers) RegisterUpdatedPeersCallback(callback func()) { + p.callbacks = append(p.callbacks, callback) +} + +// Scan network interfaces to determine an identifier from either IP or hostname. +func getIdentifierFromInterfaces(c config.Config) (string, error) { + myIdentifier, _ := os.Hostname() + identifierInterfaceName, _ := c.GetIdentifierInterfaceName() + + if identifierInterfaceName != "" { + ifc, err := net.InterfaceByName(identifierInterfaceName) + if err != nil { + logrus.WithError(err).WithField("interface", identifierInterfaceName). + Error("IdentifierInterfaceName set but couldn't find interface by that name") + return "", err + } + addrs, err := ifc.Addrs() + if err != nil { + logrus.WithError(err).WithField("interface", identifierInterfaceName). + Error("IdentifierInterfaceName set but couldn't list addresses") + return "", err + } + var ipStr string + for _, addr := range addrs { + // ParseIP doesn't know what to do with the suffix + ip := net.ParseIP(strings.Split(addr.String(), "/")[0]) + ipv6, _ := c.GetUseIPV6Identifier() + if ipv6 && ip.To16() != nil { + ipStr = fmt.Sprintf("[%s]", ip.String()) + break + } + if !ipv6 && ip.To4() != nil { + ipStr = ip.String() + break + } + } + if ipStr == "" { + err = errors.New("could not find a valid IP to use from interface") + logrus.WithField("interface", ifc.Name).WithError(err) + return "", err + } + myIdentifier = ipStr + logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface") + } + + return myIdentifier, nil +} + +// hashList hashes a list of strings into a single uint64 +func hashList(list []string) uint64 { + var h uint64 = 255798297204 // arbitrary seed + for _, s := range list { + h = wyhash.Hash([]byte(s), h) + } + return h +} diff --git a/internal/peer/redis.go b/internal/peer/redis.go index a8c0cba9b5..b1c26a5caa 100644 --- a/internal/peer/redis.go +++ b/internal/peer/redis.go @@ -3,12 +3,10 @@ package peer import ( "context" "crypto/tls" - "errors" "fmt" "net" - "os" + "slices" "sort" - "strings" "sync" "time" @@ -18,23 +16,6 @@ import ( "github.com/sirupsen/logrus" ) -const ( - // refreshCacheInterval is how frequently this host will re-register itself - // with Redis. This should happen about 3x during each timeout phase in order - // to allow multiple timeouts to fail and yet still keep the host in the mix. - // Falling out of Redis will result in re-hashing the host-trace affinity and - // will cause broken traces for those that fall on both sides of the rehashing. - // This is why it's important to ensure hosts stay in the pool. - refreshCacheInterval = 3 * time.Second - - // peerEntryTimeout is how long redis will wait before expiring a peer that - // doesn't check in. The ratio of refresh to peer timeout should be 1/3. Redis - // timeouts are in seconds and entries can last up to 2 seconds longer than - // their expected timeout (in my load testing), so the lower bound for this - // timer should be ... 5sec? - peerEntryTimeout = 10 * time.Second -) - type redisPeers struct { store *redimem.RedisMembership peers []string @@ -147,6 +128,15 @@ func (p *redisPeers) RegisterUpdatedPeersCallback(cb func()) { p.callbacks = append(p.callbacks, cb) } +// old-style redis peers don't need to start or stop but they do need the functions +func (p *redisPeers) Start() error { + return nil +} + +func (p *redisPeers) Stop() error { + return nil +} + // registerSelf inserts self into the peer list and updates self's entry on a // regular basis so it doesn't time out and get removed from the list of peers. // When this function stops, it tries to remove the registered key. @@ -215,7 +205,7 @@ func (p *redisPeers) watchPeers(done chan struct{}) { } sort.Strings(currentPeers) - if !equal(oldPeerList, currentPeers) { + if !slices.Equal(oldPeerList, currentPeers) { // update peer list and trigger callbacks saying the peer list has changed p.peerLock.Lock() p.peers = currentPeers @@ -298,62 +288,3 @@ func publicAddr(c config.Config) (string, error) { return publicListenAddr, nil } - -// Scan network interfaces to determine an identifier from either IP or hostname. -func getIdentifierFromInterfaces(c config.Config) (string, error) { - myIdentifier, _ := os.Hostname() - identifierInterfaceName, _ := c.GetIdentifierInterfaceName() - - if identifierInterfaceName != "" { - ifc, err := net.InterfaceByName(identifierInterfaceName) - if err != nil { - logrus.WithError(err).WithField("interface", identifierInterfaceName). - Error("IdentifierInterfaceName set but couldn't find interface by that name") - return "", err - } - addrs, err := ifc.Addrs() - if err != nil { - logrus.WithError(err).WithField("interface", identifierInterfaceName). - Error("IdentifierInterfaceName set but couldn't list addresses") - return "", err - } - var ipStr string - for _, addr := range addrs { - // ParseIP doesn't know what to do with the suffix - ip := net.ParseIP(strings.Split(addr.String(), "/")[0]) - ipv6, _ := c.GetUseIPV6Identifier() - if ipv6 && ip.To16() != nil { - ipStr = fmt.Sprintf("[%s]", ip.String()) - break - } - if !ipv6 && ip.To4() != nil { - ipStr = ip.String() - break - } - } - if ipStr == "" { - err = errors.New("could not find a valid IP to use from interface") - logrus.WithField("interface", ifc.Name).WithError(err) - return "", err - } - myIdentifier = ipStr - logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface") - } - - return myIdentifier, nil -} - -// equal tells whether a and b contain the same elements. -// A nil argument is equivalent to an empty slice. -// lifted from https://yourbasic.org/golang/compare-slices/ -func equal(a, b []string) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true -} From 5f03ef4faab3f2721f5bba75f162d3f11c2c86b7 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Fri, 21 Jun 2024 18:56:07 -0400 Subject: [PATCH 2/3] Fix mock and use it in app_test --- app/app_test.go | 27 ++++++++------------------- internal/peer/mock.go | 12 ++++++++++++ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index c6cd558dfc..5004a23d19 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -85,17 +85,6 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) { } } -type testPeers struct { - peers []string -} - -func (p *testPeers) GetPeers() ([]string, error) { - return p.peers, nil -} - -func (p *testPeers) RegisterUpdatedPeersCallback(callback func()) { -} - func newStartedApp( t testing.TB, libhoneyT transmission.Sender, @@ -331,8 +320,8 @@ func TestPeerRouting(t *testing.T) { // Parallel integration tests need different ports! t.Parallel() - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:11001", "http://localhost:11003", }, @@ -464,8 +453,8 @@ func TestHostMetadataSpanAdditions(t *testing.T) { func TestEventsEndpoint(t *testing.T) { t.Parallel() - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:13001", "http://localhost:13003", }, @@ -580,8 +569,8 @@ func TestEventsEndpoint(t *testing.T) { func TestEventsEndpointWithNonLegacyKey(t *testing.T) { t.Parallel() - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:15001", "http://localhost:15003", }, @@ -843,8 +832,8 @@ func BenchmarkDistributedTraces(b *testing.B) { }, } - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:12001", "http://localhost:12003", "http://localhost:12005", diff --git a/internal/peer/mock.go b/internal/peer/mock.go index 2bca0c5c10..cc8b00202b 100644 --- a/internal/peer/mock.go +++ b/internal/peer/mock.go @@ -4,9 +4,21 @@ type MockPeers struct { Peers []string } +// ensure that MockPeers implements the Peers interface +var _ Peers = (*MockPeers)(nil) + func (p *MockPeers) GetPeers() ([]string, error) { return p.Peers, nil } + func (p *MockPeers) RegisterUpdatedPeersCallback(callback func()) { callback() } + +func (p *MockPeers) Start() error { + return nil +} + +func (p *MockPeers) Stop() error { + return nil +} From d13b69e65b29f1e9fe0e83dfb83a188f52e81b57 Mon Sep 17 00:00:00 2001 From: Kent Quirk Date: Sun, 23 Jun 2024 10:39:28 -0400 Subject: [PATCH 3/3] Use publicAddr, clean up a bit --- internal/peer/pubsub.go | 44 +++++++++++++++++++++++++++++++----- internal/peer/pubsub_test.go | 35 ++++++++++++++++++++++++++++ internal/peer/redis.go | 30 ------------------------ 3 files changed, 73 insertions(+), 36 deletions(-) create mode 100644 internal/peer/pubsub_test.go diff --git a/internal/peer/pubsub.go b/internal/peer/pubsub.go index 22819767f2..b614c5d794 100644 --- a/internal/peer/pubsub.go +++ b/internal/peer/pubsub.go @@ -81,7 +81,7 @@ func (p *pubsubPeers) Start() error { p.callbacks = make([]func(), 0) p.sub = p.PubSub.Subscribe(context.Background(), "peers") - myname, err := getIdentifierFromInterfaces(p.Config) + myaddr, err := publicAddr(p.Config) if err != nil { return err } @@ -96,7 +96,7 @@ func (p *pubsubPeers) Start() error { case <-ticker.Chan(): // publish our presence periodically ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout()) - p.PubSub.Publish(ctx, "peers", "register "+myname) + p.PubSub.Publish(ctx, "peers", "register "+myaddr) cancel() case msg := <-p.sub.Channel(): parts := strings.Split(msg, " ") @@ -120,11 +120,11 @@ func (p *pubsubPeers) Start() error { func (p *pubsubPeers) Stop() error { // unregister ourselves - myname, err := getIdentifierFromInterfaces(p.Config) + myaddr, err := publicAddr(p.Config) if err != nil { return err } - p.PubSub.Publish(context.Background(), "peers", "unregister "+myname) + p.PubSub.Publish(context.Background(), "peers", "unregister "+myaddr) close(p.done) return nil } @@ -144,8 +144,40 @@ func (p *pubsubPeers) RegisterUpdatedPeersCallback(callback func()) { p.callbacks = append(p.callbacks, callback) } -// Scan network interfaces to determine an identifier from either IP or hostname. -func getIdentifierFromInterfaces(c config.Config) (string, error) { +func publicAddr(c config.Config) (string, error) { + // compute the public version of my peer listen address + listenAddr, _ := c.GetPeerListenAddr() + // first, extract the port + _, port, err := net.SplitHostPort(listenAddr) + + if err != nil { + return "", err + } + + var myIdentifier string + + // If RedisIdentifier is set, use as identifier. + if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" { + myIdentifier = redisIdentifier + logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config") + } else { + // Otherwise, determine identifier from network interface. + myIdentifier, err = getIdentifierFromInterface(c) + if err != nil { + return "", err + } + } + + publicListenAddr := fmt.Sprintf("http://%s:%s", myIdentifier, port) + + return publicListenAddr, nil +} + +// getIdentifierFromInterface returns a string that uniquely identifies this +// host in the network. If an interface is specified, it will scan it to +// determine an identifier from the first IP address on that interface. +// Otherwise, it will use the hostname. +func getIdentifierFromInterface(c config.Config) (string, error) { myIdentifier, _ := os.Hostname() identifierInterfaceName, _ := c.GetIdentifierInterfaceName() diff --git a/internal/peer/pubsub_test.go b/internal/peer/pubsub_test.go new file mode 100644 index 0000000000..ea2b341aad --- /dev/null +++ b/internal/peer/pubsub_test.go @@ -0,0 +1,35 @@ +package peer + +import ( + "testing" + + "github.com/honeycombio/refinery/config" +) + +func Test_publicAddr(t *testing.T) { + cfg := &config.MockConfig{ + GetPeerListenAddrVal: "127.0.0.1:3443", + RedisIdentifier: "somehostname", + IdentifierInterfaceName: "en0", + } + tests := []struct { + name string + c config.Config + want string + wantErr bool + }{ + {"basic", cfg, "http://somehostname:3443", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := publicAddr(tt.c) + if (err != nil) != tt.wantErr { + t.Errorf("publicAddr() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("publicAddr() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/peer/redis.go b/internal/peer/redis.go index b1c26a5caa..05983ce899 100644 --- a/internal/peer/redis.go +++ b/internal/peer/redis.go @@ -3,8 +3,6 @@ package peer import ( "context" "crypto/tls" - "fmt" - "net" "slices" "sort" "sync" @@ -260,31 +258,3 @@ func buildOptions(c config.Config) []redis.DialOption { return options } - -func publicAddr(c config.Config) (string, error) { - // compute the public version of my peer listen address - listenAddr, _ := c.GetPeerListenAddr() - _, port, err := net.SplitHostPort(listenAddr) - - if err != nil { - return "", err - } - - var myIdentifier string - - // If RedisIdentifier is set, use as identifier. - if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" { - myIdentifier = redisIdentifier - logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config") - } else { - // Otherwise, determine identifier from network interface. - myIdentifier, err = getIdentifierFromInterfaces(c) - if err != nil { - return "", err - } - } - - publicListenAddr := fmt.Sprintf("http://%s:%s", myIdentifier, port) - - return publicListenAddr, nil -}