diff --git a/app/app_test.go b/app/app_test.go index 15cc1ec254..4a109664fd 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -86,17 +86,6 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) { } } -type testPeers struct { - peers []string -} - -func (p *testPeers) GetPeers() ([]string, error) { - return p.peers, nil -} - -func (p *testPeers) RegisterUpdatedPeersCallback(callback func()) { -} - func newStartedApp( t testing.TB, libhoneyT transmission.Sender, @@ -126,8 +115,7 @@ func newStartedApp( var err error if peers == nil { - peers, err = peer.NewPeers(context.Background(), c, make(chan struct{})) - assert.NoError(t, err) + peers = &peer.FilePeers{Cfg: c} } a := App{} @@ -333,15 +321,14 @@ func TestPeerRouting(t *testing.T) { // Parallel integration tests need different ports! t.Parallel() - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:11001", "http://localhost:11003", }, } var apps [2]*App - var addrs [2]string var senders [2]*transmission.MockSender for i := range apps { var graph inject.Graph @@ -349,8 +336,6 @@ func TestPeerRouting(t *testing.T) { senders[i] = &transmission.MockSender{} apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false) defer startstop.Stop(graph.Objects(), nil) - - addrs[i] = "localhost:" + strconv.Itoa(basePort) } // Deliver to host 1, it should be passed to host 0 and emitted there. @@ -466,15 +451,14 @@ func TestHostMetadataSpanAdditions(t *testing.T) { func TestEventsEndpoint(t *testing.T) { t.Parallel() - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:13001", "http://localhost:13003", }, } var apps [2]*App - var addrs [2]string var senders [2]*transmission.MockSender for i := range apps { var graph inject.Graph @@ -482,8 +466,6 @@ func TestEventsEndpoint(t *testing.T) { senders[i] = &transmission.MockSender{} apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false) defer startstop.Stop(graph.Objects(), nil) - - addrs[i] = "localhost:" + strconv.Itoa(basePort) } // Deliver to host 1, it should be passed to host 0 and emitted there. @@ -582,15 +564,14 @@ func TestEventsEndpoint(t *testing.T) { func TestEventsEndpointWithNonLegacyKey(t *testing.T) { t.Parallel() - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:15001", "http://localhost:15003", }, } var apps [2]*App - var addrs [2]string var senders [2]*transmission.MockSender for i := range apps { basePort := 15000 + (i * 2) @@ -600,8 +581,6 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) apps[i] = app defer startstop.Stop(graph.Objects(), nil) - - addrs[i] = "localhost:" + strconv.Itoa(basePort) } // this traceID was chosen because it hashes to the appropriate shard for this @@ -845,8 +824,8 @@ func BenchmarkDistributedTraces(b *testing.B) { }, } - peers := &testPeers{ - peers: []string{ + peers := &peer.MockPeers{ + Peers: []string{ "http://localhost:12001", "http://localhost:12003", "http://localhost:12005", diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index fb5acd5409..c92649662f 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "fmt" "net" "net/http" @@ -29,6 +28,7 @@ import ( "github.com/honeycombio/refinery/internal/peer" "github.com/honeycombio/refinery/logger" "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/pubsub" "github.com/honeycombio/refinery/sample" "github.com/honeycombio/refinery/service/debug" "github.com/honeycombio/refinery/sharder" @@ -113,14 +113,32 @@ func main() { os.Exit(1) } - ctx, cancel := context.WithTimeout(context.Background(), c.GetPeerTimeout()) - defer cancel() - done := make(chan struct{}) - peers, err := peer.NewPeers(ctx, c, done) - + var peers peer.Peers + var pubsubber pubsub.PubSub + ptype, err := c.GetPeerManagementType() if err != nil { - fmt.Printf("unable to load peers: %+v\n", err) - os.Exit(1) + panic(err) + } + switch ptype { + case "file": + peers = &peer.FilePeers{Cfg: c} + // we know FilePeers doesn't need to be Started, so we can ask it how many peers we have. + // if we only have one, we can use the local pubsub implementation. + peerList, err := peers.GetPeers() + if err != nil { + panic(err) + } + if len(peerList) == 1 { + pubsubber = &pubsub.LocalPubSub{} + } else { + pubsubber = &pubsub.GoRedisPubSub{} + } + case "redis": + pubsubber = &pubsub.GoRedisPubSub{} + peers = &peer.RedisPubsubPeers{} + default: + // this should have been caught by validation + panic("invalid config option 'PeerManagement.Type'") } // upstreamTransport is the http transport used to send things on to Honeycomb @@ -182,6 +200,7 @@ func main() { os.Exit(1) } + done := make(chan struct{}) stressRelief := &collect.StressRelief{Done: done} upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream") peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer") @@ -221,6 +240,7 @@ func main() { objects := []*inject.Object{ {Value: c}, {Value: peers}, + {Value: pubsubber}, {Value: lgr}, {Value: upstreamTransport, Name: "upstreamTransport"}, {Value: peerTransport, Name: "peerTransport"}, diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index 728566436b..5af8b71595 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -831,11 +831,13 @@ groups: description: > Peer management is the mechanism by which Refinery locates its peers. - `file` means that Refinery gets its peer list from - the Peers list in this config file. + `file` means that Refinery gets its peer list from the Peers list in + this config file. - `redis` means that Refinery self-registers with a Redis instance and - gets its peer list from there. + `redis` means that Refinery uses a Publish/Subscribe mechanism, + implemented on Redis, to propagate peer lists much more quickly than + the legacy mechanism. This is the recommended setting, especially for + new installations. - name: Identifier v1group: PeerManagement @@ -902,9 +904,8 @@ groups: - name: RedisPeerManagement title: "Redis Peer Management" description: > - controls how the Refinery cluster communicates - between peers when using Redis. Only applies when `PeerManagement.Type` - is "redis". + controls how the Refinery cluster communicates between peers when using + Redis. Does not apply when `PeerManagement.Type` is "file". fields: - name: Host @@ -968,6 +969,7 @@ groups: default: "refinery" example: "customPrefix" reload: false + lastversion: v2.6 validations: - type: notempty summary: is a string used as a prefix for the keys in Redis while @@ -985,6 +987,7 @@ groups: default: 0 example: 1 reload: false + lastversion: v2.6 validations: - type: minimum arg: 0 diff --git a/internal/peer/file.go b/internal/peer/file.go index b47228a585..5ec68859cb 100644 --- a/internal/peer/file.go +++ b/internal/peer/file.go @@ -2,30 +2,31 @@ package peer import "github.com/honeycombio/refinery/config" -type filePeers struct { - c config.Config +type FilePeers struct { + Cfg config.Config `inject:""` } -// NewFilePeers returns a peers collection backed by the config file -func newFilePeers(c config.Config) Peers { - return &filePeers{ - c: c, - } -} - -func (p *filePeers) GetPeers() ([]string, error) { +func (p *FilePeers) GetPeers() ([]string, error) { // we never want to return an empty list of peers, so if the config // returns an empty list, return a single peer. This keeps the sharding // logic happy. - peers, err := p.c.GetPeers() + peers, err := p.Cfg.GetPeers() if len(peers) == 0 { peers = []string{"http://127.0.0.1:8081"} } return peers, err } -func (p *filePeers) RegisterUpdatedPeersCallback(callback func()) { +func (p *FilePeers) RegisterUpdatedPeersCallback(callback func()) { // whenever registered, call the callback immediately // otherwise do nothing since they never change callback() } + +func (p *FilePeers) Start() error { + return nil +} + +func (p *FilePeers) Stop() error { + return nil +} diff --git a/internal/peer/file_test.go b/internal/peer/file_test.go index f913cbb385..837c69ae91 100644 --- a/internal/peer/file_test.go +++ b/internal/peer/file_test.go @@ -10,9 +10,13 @@ func TestFilePeers(t *testing.T) { peers := []string{"peer"} c := &config.MockConfig{ - GetPeersVal: peers, + PeerManagementType: "file", + GetPeersVal: peers, + } + p, err := newPeers(c) + if err != nil { + t.Error(err) } - p := newFilePeers(c) if d, _ := p.GetPeers(); !(len(d) == 1 && d[0] == "peer") { t.Error("received", d, "expected", "[peer]") diff --git a/internal/peer/mock.go b/internal/peer/mock.go index 2bca0c5c10..51b28d2608 100644 --- a/internal/peer/mock.go +++ b/internal/peer/mock.go @@ -7,6 +7,15 @@ type MockPeers struct { func (p *MockPeers) GetPeers() ([]string, error) { return p.Peers, nil } + func (p *MockPeers) RegisterUpdatedPeersCallback(callback func()) { callback() } + +func (p *MockPeers) Start() error { + return nil +} + +func (p *MockPeers) Stop() error { + return nil +} diff --git a/internal/peer/peers.go b/internal/peer/peers.go index ed84ab7764..070d95ed8e 100644 --- a/internal/peer/peers.go +++ b/internal/peer/peers.go @@ -1,32 +1,14 @@ package peer import ( - "context" - "errors" - - "github.com/honeycombio/refinery/config" + "github.com/facebookgo/startstop" ) // Peers holds the collection of peers for the cluster type Peers interface { GetPeers() ([]string, error) - RegisterUpdatedPeersCallback(callback func()) -} - -func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) { - t, err := c.GetPeerManagementType() - - if err != nil { - return nil, err - } - - switch t { - case "file": - return newFilePeers(c), nil - case "redis": - return newRedisPeers(ctx, c, done) - default: - return nil, errors.New("invalid config option 'PeerManagement.Type'") - } + // make it injectable + startstop.Starter + startstop.Stopper } diff --git a/internal/peer/peers_test.go b/internal/peer/peers_test.go index e481d0796c..98a2c6d4f3 100644 --- a/internal/peer/peers_test.go +++ b/internal/peer/peers_test.go @@ -1,51 +1,81 @@ package peer import ( - "context" + "errors" + "fmt" + "os" "strings" "testing" "time" + "github.com/facebookgo/inject" + "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/pubsub" + "github.com/jonboulle/clockwork" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestNewPeers(t *testing.T) { - c := &config.MockConfig{ - PeerManagementType: "file", - PeerTimeout: 5 * time.Second, - TraceIdFieldNames: []string{"trace.trace_id"}, - ParentIdFieldNames: []string{"trace.parent_id"}, +func newPeers(c config.Config) (Peers, error) { + var peers Peers + var pubsubber pubsub.PubSub + ptype, err := c.GetPeerManagementType() + if err != nil { + return nil, err } - - done := make(chan struct{}) - defer close(done) - p, err := NewPeers(context.Background(), c, done) - assert.NoError(t, err) - require.NotNil(t, p) - - switch i := p.(type) { - case *filePeers: + switch ptype { + case "file": + peers = &FilePeers{ + Cfg: c, + } + // we know FilePeers doesn't need to be Started, so as long as we gave it a Cfg above, + // we can ask it how many peers we have. + // if we only have one, we can use the local pubsub implementation. + peerList, err := peers.GetPeers() + if err != nil { + return nil, err + } + if len(peerList) == 1 { + pubsubber = &pubsub.LocalPubSub{} + } else { + pubsubber = &pubsub.GoRedisPubSub{} + } + case "redis": + pubsubber = &pubsub.GoRedisPubSub{} + peers = &RedisPubsubPeers{} default: - t.Errorf("received %T expected %T", i, &filePeers{}) + // this should have been caught by validation + return nil, errors.New("invalid config option 'PeerManagement.Type'") } - c = &config.MockConfig{ - GetPeerListenAddrVal: "0.0.0.0:8081", - PeerManagementType: "redis", - PeerTimeout: 5 * time.Second, + // we need to include all the metrics types so we can inject them in case they're needed + var g inject.Graph + objects := []*inject.Object{ + {Value: c}, + {Value: peers}, + {Value: pubsubber}, + {Value: &logger.NullLogger{}}, + {Value: clockwork.NewFakeClock()}, + } + err = g.Provide(objects...) + if err != nil { + return nil, fmt.Errorf("failed to provide injection graph. error: %+v\n", err) } - p, err = NewPeers(context.Background(), c, done) - assert.NoError(t, err) - require.NotNil(t, p) + if err := g.Populate(); err != nil { + return nil, fmt.Errorf("failed to populate injection graph. error: %+v\n", err) + } - switch i := p.(type) { - case *redisPeers: - default: - t.Errorf("received %T expected %T", i, &redisPeers{}) + ststLogger := logrus.New() + ststLogger.SetLevel(logrus.InfoLevel) + if err := startstop.Start(g.Objects(), ststLogger); err != nil { + fmt.Printf("failed to start injected dependencies. error: %+v\n", err) + os.Exit(1) } + return peers, nil } func TestPeerShutdown(t *testing.T) { @@ -55,12 +85,13 @@ func TestPeerShutdown(t *testing.T) { PeerTimeout: 5 * time.Second, } + p, err := newPeers(c) + require.NoError(t, err) + done := make(chan struct{}) - p, err := NewPeers(context.Background(), c, done) - assert.NoError(t, err) require.NotNil(t, p) - peer, ok := p.(*redisPeers) + peer, ok := p.(*RedisPubsubPeers) assert.True(t, ok) peers, err := peer.GetPeers() @@ -74,6 +105,6 @@ func TestPeerShutdown(t *testing.T) { assert.Eventually(t, func() bool { peers, err = peer.GetPeers() assert.NoError(t, err) - return len(peers) == 0 + return len(peers) == 1 }, 5*time.Second, 200*time.Millisecond) } diff --git a/internal/peer/pubsub_redis.go b/internal/peer/pubsub_redis.go new file mode 100644 index 0000000000..033751c8ed --- /dev/null +++ b/internal/peer/pubsub_redis.go @@ -0,0 +1,255 @@ +package peer + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "strings" + "time" + + "github.com/dgryski/go-wyhash" + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/generics" + "github.com/honeycombio/refinery/pubsub" + "github.com/jonboulle/clockwork" + "github.com/sirupsen/logrus" +) + +const ( + // refreshCacheInterval is how frequently this host will re-register itself + // with Redis. This should happen about 3x during each timeout phase in order + // to allow multiple timeouts to fail and yet still keep the host in the mix. + // Falling out of Redis will result in re-hashing the host-trace affinity and + // will cause broken traces for those that fall on both sides of the rehashing. + // This is why it's important to ensure hosts stay in the pool. + refreshCacheInterval = 3 * time.Second + + // peerEntryTimeout is how long redis will wait before expiring a peer that + // doesn't check in. The ratio of refresh to peer timeout should be 1/3. Redis + // timeouts are in seconds and entries can last up to 2 seconds longer than + // their expected timeout (in my load testing), so the lower bound for this + // timer should be ... 5sec? + peerEntryTimeout = 10 * time.Second +) + +type peerAction string + +const ( + Register peerAction = "R" + Unregister peerAction = "U" +) + +type peerCommand struct { + action peerAction + peer string +} + +func newPeerCommand(action peerAction, peer string) *peerCommand { + return &peerCommand{ + action: action, + peer: peer, + } +} + +func (p *peerCommand) unmarshal(msg string) bool { + if len(msg) < 2 { + return false + } + p.action = peerAction(msg[:1]) + p.peer = msg[1:] + switch p.action { + case Register, Unregister: + return true + default: + return false + } +} + +func (p *peerCommand) marshal() string { + return string(p.action) + p.peer +} + +type RedisPubsubPeers struct { + Config config.Config `inject:""` + PubSub pubsub.PubSub `inject:""` + Clock clockwork.Clock `inject:""` + + peers *generics.SetWithTTL[string] + hash uint64 + callbacks []func() + sub pubsub.Subscription + done chan struct{} +} + +// checkHash checks the hash of the current list of peers and calls any registered callbacks +func (p *RedisPubsubPeers) checkHash() { + peers := p.peers.Members() + newhash := hashList(peers) + if newhash != p.hash { + p.hash = newhash + for _, cb := range p.callbacks { + go cb() + } + } +} + +func (p *RedisPubsubPeers) listen(msg string) { + cmd := &peerCommand{} + if !cmd.unmarshal(msg) { + return + } + switch cmd.action { + case Unregister: + p.peers.Remove(cmd.peer) + case Register: + p.peers.Add(cmd.peer) + } + p.checkHash() +} + +func (p *RedisPubsubPeers) Start() error { + if p.PubSub == nil { + return errors.New("injected pubsub is nil") + } + + p.done = make(chan struct{}) + p.peers = generics.NewSetWithTTL[string](peerEntryTimeout) + p.callbacks = make([]func(), 0) + p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen) + + myaddr, err := publicAddr(p.Config) + if err != nil { + return err + } + + // periodically refresh our presence in the list of peers, and update peers as they come in + go func() { + ticker := p.Clock.NewTicker(refreshCacheInterval) + for { + select { + case <-p.done: + return + case <-ticker.Chan(): + // publish our presence periodically + ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout()) + p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr).marshal()) + cancel() + } + } + }() + + return nil +} + +func (p *RedisPubsubPeers) Stop() error { + // unregister ourselves + myaddr, err := publicAddr(p.Config) + if err != nil { + return err + } + p.PubSub.Publish(context.Background(), "peers", newPeerCommand(Unregister, myaddr).marshal()) + close(p.done) + return nil +} + +func (p *RedisPubsubPeers) GetPeers() ([]string, error) { + // we never want to return an empty list of peers, so if the system returns + // an empty list, return a single peer (its name doesn't really matter). + // This keeps the sharding logic happy. + peers := p.peers.Members() + if len(peers) == 0 { + peers = []string{"http://127.0.0.1:8081"} + } + return peers, nil +} + +func (p *RedisPubsubPeers) RegisterUpdatedPeersCallback(callback func()) { + p.callbacks = append(p.callbacks, callback) +} + +func publicAddr(c config.Config) (string, error) { + // compute the public version of my peer listen address + listenAddr, _ := c.GetPeerListenAddr() + // first, extract the port + _, port, err := net.SplitHostPort(listenAddr) + + if err != nil { + return "", err + } + + var myIdentifier string + + // If RedisIdentifier is set, use as identifier. + if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" { + myIdentifier = redisIdentifier + logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config") + } else { + // Otherwise, determine identifier from network interface. + myIdentifier, err = getIdentifierFromInterface(c) + if err != nil { + return "", err + } + } + + publicListenAddr := fmt.Sprintf("http://%s:%s", myIdentifier, port) + + return publicListenAddr, nil +} + +// getIdentifierFromInterface returns a string that uniquely identifies this +// host in the network. If an interface is specified, it will scan it to +// determine an identifier from the first IP address on that interface. +// Otherwise, it will use the hostname. +func getIdentifierFromInterface(c config.Config) (string, error) { + myIdentifier, _ := os.Hostname() + identifierInterfaceName, _ := c.GetIdentifierInterfaceName() + + if identifierInterfaceName != "" { + ifc, err := net.InterfaceByName(identifierInterfaceName) + if err != nil { + logrus.WithError(err).WithField("interface", identifierInterfaceName). + Error("IdentifierInterfaceName set but couldn't find interface by that name") + return "", err + } + addrs, err := ifc.Addrs() + if err != nil { + logrus.WithError(err).WithField("interface", identifierInterfaceName). + Error("IdentifierInterfaceName set but couldn't list addresses") + return "", err + } + var ipStr string + for _, addr := range addrs { + // ParseIP doesn't know what to do with the suffix + ip := net.ParseIP(strings.Split(addr.String(), "/")[0]) + ipv6, _ := c.GetUseIPV6Identifier() + if ipv6 && ip.To16() != nil { + ipStr = fmt.Sprintf("[%s]", ip.String()) + break + } + if !ipv6 && ip.To4() != nil { + ipStr = ip.String() + break + } + } + if ipStr == "" { + err = errors.New("could not find a valid IP to use from interface") + logrus.WithField("interface", ifc.Name).WithError(err) + return "", err + } + myIdentifier = ipStr + logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface") + } + + return myIdentifier, nil +} + +// hashList hashes a list of strings into a single uint64 +func hashList(list []string) uint64 { + var h uint64 = 255798297204 // arbitrary seed + for _, s := range list { + h = wyhash.Hash([]byte(s), h) + } + return h +} diff --git a/internal/peer/pubsub_test.go b/internal/peer/pubsub_test.go new file mode 100644 index 0000000000..2c1d549202 --- /dev/null +++ b/internal/peer/pubsub_test.go @@ -0,0 +1,51 @@ +package peer + +import ( + "testing" + + "github.com/honeycombio/refinery/config" + "github.com/stretchr/testify/assert" +) + +func Test_publicAddr(t *testing.T) { + cfg := &config.MockConfig{ + GetPeerListenAddrVal: "127.0.0.1:3443", + RedisIdentifier: "somehostname", + IdentifierInterfaceName: "en0", + } + tests := []struct { + name string + c config.Config + want string + wantErr bool + }{ + {"basic", cfg, "http://somehostname:3443", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := publicAddr(tt.c) + if (err != nil) != tt.wantErr { + t.Errorf("publicAddr() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("publicAddr() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestPeerActions(t *testing.T) { + cmd := newPeerCommand(Register, "foo") + assert.Equal(t, "Rfoo", cmd.marshal()) + assert.Equal(t, "foo", cmd.peer) + assert.Equal(t, Register, cmd.action) + cmd2 := peerCommand{} + b := cmd2.unmarshal("Ubar") + assert.True(t, b) + assert.Equal(t, "bar", cmd2.peer) + assert.Equal(t, Unregister, cmd2.action) + + b = cmd2.unmarshal("invalid") + assert.False(t, b) +} diff --git a/internal/peer/redis.go b/internal/peer/redis.go deleted file mode 100644 index a8c0cba9b5..0000000000 --- a/internal/peer/redis.go +++ /dev/null @@ -1,359 +0,0 @@ -package peer - -import ( - "context" - "crypto/tls" - "errors" - "fmt" - "net" - "os" - "sort" - "strings" - "sync" - "time" - - "github.com/gomodule/redigo/redis" - "github.com/honeycombio/refinery/config" - "github.com/honeycombio/refinery/internal/redimem" - "github.com/sirupsen/logrus" -) - -const ( - // refreshCacheInterval is how frequently this host will re-register itself - // with Redis. This should happen about 3x during each timeout phase in order - // to allow multiple timeouts to fail and yet still keep the host in the mix. - // Falling out of Redis will result in re-hashing the host-trace affinity and - // will cause broken traces for those that fall on both sides of the rehashing. - // This is why it's important to ensure hosts stay in the pool. - refreshCacheInterval = 3 * time.Second - - // peerEntryTimeout is how long redis will wait before expiring a peer that - // doesn't check in. The ratio of refresh to peer timeout should be 1/3. Redis - // timeouts are in seconds and entries can last up to 2 seconds longer than - // their expected timeout (in my load testing), so the lower bound for this - // timer should be ... 5sec? - peerEntryTimeout = 10 * time.Second -) - -type redisPeers struct { - store *redimem.RedisMembership - peers []string - peerLock sync.Mutex - c config.Config - callbacks []func() - publicAddr string -} - -// NewRedisPeers returns a peers collection backed by redis -func newRedisPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) { - redisHost, _ := c.GetRedisHost() - - if redisHost == "" { - redisHost = "localhost:6379" - } - - options := buildOptions(c) - pool := &redis.Pool{ - MaxIdle: 3, - MaxActive: 30, - IdleTimeout: 5 * time.Minute, - Wait: true, - Dial: func() (redis.Conn, error) { - // if redis is started at the same time as refinery, connecting to redis can - // fail and cause refinery to error out. - // Instead, we will try to connect to redis for up to 10 seconds with - // a 1 second delay between attempts to allow the redis process to init - var ( - conn redis.Conn - err error - ) - for timeout := time.After(10 * time.Second); ; { - select { - case <-timeout: - return nil, err - default: - if authCode, _ := c.GetRedisAuthCode(); authCode != "" { - conn, err = redis.Dial("tcp", redisHost, options...) - if err != nil { - return nil, err - } - if _, err := conn.Do("AUTH", authCode); err != nil { - conn.Close() - return nil, err - } - if err == nil { - return conn, nil - } - } else { - conn, err = redis.Dial("tcp", redisHost, options...) - if err == nil { - return conn, nil - } - } - time.Sleep(time.Second) - } - } - }, - } - - // deal with this error - address, err := publicAddr(c) - - if err != nil { - return nil, err - } - - peers := &redisPeers{ - store: &redimem.RedisMembership{ - Prefix: c.GetRedisPrefix(), - Pool: pool, - }, - peers: make([]string, 1), - c: c, - callbacks: make([]func(), 0), - publicAddr: address, - } - - // register myself once - err = peers.store.Register(ctx, address, peerEntryTimeout) - if err != nil { - logrus.WithError(err).Errorf("failed to register self with redis peer store") - return nil, err - } - - // go establish a regular registration heartbeat to ensure I stay alive in redis - go peers.registerSelf(done) - - // get our peer list once to seed ourselves - peers.updatePeerListOnce() - - // go watch the list of peers and trigger callbacks whenever it changes. - // populate my local list of peers so each request can hit memory and only hit - // redis on a ticker - go peers.watchPeers(done) - - return peers, nil -} - -func (p *redisPeers) GetPeers() ([]string, error) { - p.peerLock.Lock() - defer p.peerLock.Unlock() - retList := make([]string, len(p.peers)) - copy(retList, p.peers) - return retList, nil -} - -func (p *redisPeers) RegisterUpdatedPeersCallback(cb func()) { - p.callbacks = append(p.callbacks, cb) -} - -// registerSelf inserts self into the peer list and updates self's entry on a -// regular basis so it doesn't time out and get removed from the list of peers. -// When this function stops, it tries to remove the registered key. -func (p *redisPeers) registerSelf(done chan struct{}) { - tk := time.NewTicker(refreshCacheInterval) - for { - select { - case <-tk.C: - ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout()) - // every interval, insert a timeout record. we ignore the error - // here since Register() logs the error for us. - p.store.Register(ctx, p.publicAddr, peerEntryTimeout) - cancel() - case <-done: - // unregister ourselves - ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout()) - p.store.Unregister(ctx, p.publicAddr) - cancel() - return - } - } -} - -func (p *redisPeers) updatePeerListOnce() { - ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout()) - defer cancel() - - currentPeers, err := p.store.GetMembers(ctx) - if err != nil { - logrus.WithError(err). - WithFields(logrus.Fields{ - "name": p.publicAddr, - "timeout": p.c.GetPeerTimeout().String(), - }). - Error("get members failed") - return - } - sort.Strings(currentPeers) - // update peer list and trigger callbacks saying the peer list has changed - p.peerLock.Lock() - p.peers = currentPeers - p.peerLock.Unlock() -} - -func (p *redisPeers) watchPeers(done chan struct{}) { - oldPeerList := p.peers - sort.Strings(oldPeerList) - tk := time.NewTicker(refreshCacheInterval) - - for { - select { - case <-tk.C: - ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout()) - currentPeers, err := p.store.GetMembers(ctx) - cancel() - - if err != nil { - logrus.WithError(err). - WithFields(logrus.Fields{ - "name": p.publicAddr, - "timeout": p.c.GetPeerTimeout().String(), - "oldPeers": oldPeerList, - }). - Error("get members failed during watch") - continue - } - - sort.Strings(currentPeers) - if !equal(oldPeerList, currentPeers) { - // update peer list and trigger callbacks saying the peer list has changed - p.peerLock.Lock() - p.peers = currentPeers - oldPeerList = currentPeers - p.peerLock.Unlock() - for _, callback := range p.callbacks { - // don't block on any of the callbacks. - go callback() - } - } - case <-done: - p.peerLock.Lock() - p.peers = []string{} - p.peerLock.Unlock() - return - } - } -} - -func buildOptions(c config.Config) []redis.DialOption { - options := []redis.DialOption{ - redis.DialReadTimeout(1 * time.Second), - redis.DialConnectTimeout(1 * time.Second), - redis.DialDatabase(c.GetRedisDatabase()), - } - - username, _ := c.GetRedisUsername() - if username != "" { - options = append(options, redis.DialUsername(username)) - } - - password, _ := c.GetRedisPassword() - if password != "" { - options = append(options, redis.DialPassword(password)) - } - - useTLS, _ := c.GetUseTLS() - tlsInsecure, _ := c.GetUseTLSInsecure() - if useTLS { - tlsConfig := &tls.Config{ - MinVersion: tls.VersionTLS12, - } - - if tlsInsecure { - tlsConfig.InsecureSkipVerify = true - } - - options = append(options, - redis.DialTLSConfig(tlsConfig), - redis.DialUseTLS(true)) - } - - return options -} - -func publicAddr(c config.Config) (string, error) { - // compute the public version of my peer listen address - listenAddr, _ := c.GetPeerListenAddr() - _, port, err := net.SplitHostPort(listenAddr) - - if err != nil { - return "", err - } - - var myIdentifier string - - // If RedisIdentifier is set, use as identifier. - if redisIdentifier, _ := c.GetRedisIdentifier(); redisIdentifier != "" { - myIdentifier = redisIdentifier - logrus.WithField("identifier", myIdentifier).Info("using specified RedisIdentifier from config") - } else { - // Otherwise, determine identifier from network interface. - myIdentifier, err = getIdentifierFromInterfaces(c) - if err != nil { - return "", err - } - } - - publicListenAddr := fmt.Sprintf("http://%s:%s", myIdentifier, port) - - return publicListenAddr, nil -} - -// Scan network interfaces to determine an identifier from either IP or hostname. -func getIdentifierFromInterfaces(c config.Config) (string, error) { - myIdentifier, _ := os.Hostname() - identifierInterfaceName, _ := c.GetIdentifierInterfaceName() - - if identifierInterfaceName != "" { - ifc, err := net.InterfaceByName(identifierInterfaceName) - if err != nil { - logrus.WithError(err).WithField("interface", identifierInterfaceName). - Error("IdentifierInterfaceName set but couldn't find interface by that name") - return "", err - } - addrs, err := ifc.Addrs() - if err != nil { - logrus.WithError(err).WithField("interface", identifierInterfaceName). - Error("IdentifierInterfaceName set but couldn't list addresses") - return "", err - } - var ipStr string - for _, addr := range addrs { - // ParseIP doesn't know what to do with the suffix - ip := net.ParseIP(strings.Split(addr.String(), "/")[0]) - ipv6, _ := c.GetUseIPV6Identifier() - if ipv6 && ip.To16() != nil { - ipStr = fmt.Sprintf("[%s]", ip.String()) - break - } - if !ipv6 && ip.To4() != nil { - ipStr = ip.String() - break - } - } - if ipStr == "" { - err = errors.New("could not find a valid IP to use from interface") - logrus.WithField("interface", ifc.Name).WithError(err) - return "", err - } - myIdentifier = ipStr - logrus.WithField("identifier", myIdentifier).WithField("interface", ifc.Name).Info("using identifier from interface") - } - - return myIdentifier, nil -} - -// equal tells whether a and b contain the same elements. -// A nil argument is equivalent to an empty slice. -// lifted from https://yourbasic.org/golang/compare-slices/ -func equal(a, b []string) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true -} diff --git a/pubsub/pubsub_local.go b/pubsub/pubsub_local.go index a0cd392d8d..2472a66678 100644 --- a/pubsub/pubsub_local.go +++ b/pubsub/pubsub_local.go @@ -11,7 +11,7 @@ import ( // not communicate with any external processes. // subs are individual channels for each subscription type LocalPubSub struct { - Config *config.Config `inject:""` + Config config.Config `inject:""` topics map[string][]*LocalSubscription mut sync.RWMutex } diff --git a/sharder/deterministic_test.go b/sharder/deterministic_test.go index f5fd90c2de..d4fc3b6a81 100644 --- a/sharder/deterministic_test.go +++ b/sharder/deterministic_test.go @@ -1,7 +1,6 @@ package sharder import ( - "context" "fmt" "math/rand" "testing" @@ -30,8 +29,7 @@ func TestWhichShard(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -73,8 +71,7 @@ func TestWhichShardAtEdge(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -125,8 +122,7 @@ func BenchmarkShardBulk(b *testing.B) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(b, nil, err) + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -172,8 +168,7 @@ func TestShardBulk(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.NoError(t, err, "NewPeers should succeed") + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -245,8 +240,7 @@ func TestShardDrop(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -329,8 +323,7 @@ func TestShardAddHash(t *testing.T) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(t, nil, err) + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{}, @@ -411,8 +404,7 @@ func BenchmarkDeterministicShard(b *testing.B) { } done := make(chan struct{}) defer close(done) - filePeers, err := peer.NewPeers(context.Background(), config, done) - assert.Equal(b, nil, err) + filePeers := &peer.FilePeers{Cfg: config} sharder := DeterministicSharder{ Config: config, Logger: &logger.NullLogger{},