Skip to content

Commit

Permalink
feat: peer management on pubsub via callbacks (#1220)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Implement the current peers system using a pubsub model for peers
- Uses the current pubsub system that uses callbacks

## Short description of the changes

- Add start/stop to pubsub so it can become injectable
- Fix the mock and existing implementations
- Implement pubsub peers using the new pubsub system and
generic.SetWithTTL
- Add a few tests
- Update config metadata
- delete legacy redis peering
- make it more idiomatically injectable
- do all the things to make injection work everywhere

closes #1201

---------

Co-authored-by: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com>
  • Loading branch information
kentquirk and VinozzZ authored Jul 3, 2024
1 parent ef992e4 commit f8d7c17
Show file tree
Hide file tree
Showing 13 changed files with 456 additions and 488 deletions.
39 changes: 9 additions & 30 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,6 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) {
}
}

type testPeers struct {
peers []string
}

func (p *testPeers) GetPeers() ([]string, error) {
return p.peers, nil
}

func (p *testPeers) RegisterUpdatedPeersCallback(callback func()) {
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
Expand Down Expand Up @@ -126,8 +115,7 @@ func newStartedApp(

var err error
if peers == nil {
peers, err = peer.NewPeers(context.Background(), c, make(chan struct{}))
assert.NoError(t, err)
peers = &peer.FilePeers{Cfg: c}
}

a := App{}
Expand Down Expand Up @@ -333,24 +321,21 @@ func TestPeerRouting(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:11001",
"http://localhost:11003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 11000 + (i * 2)
senders[i] = &transmission.MockSender{}
apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
Expand Down Expand Up @@ -466,24 +451,21 @@ func TestHostMetadataSpanAdditions(t *testing.T) {
func TestEventsEndpoint(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:13001",
"http://localhost:13003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 13000 + (i * 2)
senders[i] = &transmission.MockSender{}
apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
Expand Down Expand Up @@ -582,15 +564,14 @@ func TestEventsEndpoint(t *testing.T) {
func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:15001",
"http://localhost:15003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
basePort := 15000 + (i * 2)
Expand All @@ -600,8 +581,6 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
apps[i] = app
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// this traceID was chosen because it hashes to the appropriate shard for this
Expand Down Expand Up @@ -845,8 +824,8 @@ func BenchmarkDistributedTraces(b *testing.B) {
},
}

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:12001",
"http://localhost:12003",
"http://localhost:12005",
Expand Down
36 changes: 28 additions & 8 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -29,6 +28,7 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/service/debug"
"github.com/honeycombio/refinery/sharder"
Expand Down Expand Up @@ -113,14 +113,32 @@ func main() {
os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), c.GetPeerTimeout())
defer cancel()
done := make(chan struct{})
peers, err := peer.NewPeers(ctx, c, done)

var peers peer.Peers
var pubsubber pubsub.PubSub
ptype, err := c.GetPeerManagementType()
if err != nil {
fmt.Printf("unable to load peers: %+v\n", err)
os.Exit(1)
panic(err)
}
switch ptype {
case "file":
peers = &peer.FilePeers{Cfg: c}
// we know FilePeers doesn't need to be Started, so we can ask it how many peers we have.
// if we only have one, we can use the local pubsub implementation.
peerList, err := peers.GetPeers()
if err != nil {
panic(err)
}
if len(peerList) == 1 {
pubsubber = &pubsub.LocalPubSub{}
} else {
pubsubber = &pubsub.GoRedisPubSub{}
}
case "redis":
pubsubber = &pubsub.GoRedisPubSub{}
peers = &peer.RedisPubsubPeers{}
default:
// this should have been caught by validation
panic("invalid config option 'PeerManagement.Type'")
}

// upstreamTransport is the http transport used to send things on to Honeycomb
Expand Down Expand Up @@ -182,6 +200,7 @@ func main() {
os.Exit(1)
}

done := make(chan struct{})
stressRelief := &collect.StressRelief{Done: done}
upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream")
peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer")
Expand Down Expand Up @@ -221,6 +240,7 @@ func main() {
objects := []*inject.Object{
{Value: c},
{Value: peers},
{Value: pubsubber},
{Value: lgr},
{Value: upstreamTransport, Name: "upstreamTransport"},
{Value: peerTransport, Name: "peerTransport"},
Expand Down
17 changes: 10 additions & 7 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,13 @@ groups:
description: >
Peer management is the mechanism by which Refinery locates its peers.
`file` means that Refinery gets its peer list from
the Peers list in this config file.
`file` means that Refinery gets its peer list from the Peers list in
this config file.
`redis` means that Refinery self-registers with a Redis instance and
gets its peer list from there.
`redis` means that Refinery uses a Publish/Subscribe mechanism,
implemented on Redis, to propagate peer lists much more quickly than
the legacy mechanism. This is the recommended setting, especially for
new installations.
- name: Identifier
v1group: PeerManagement
Expand Down Expand Up @@ -902,9 +904,8 @@ groups:
- name: RedisPeerManagement
title: "Redis Peer Management"
description: >
controls how the Refinery cluster communicates
between peers when using Redis. Only applies when `PeerManagement.Type`
is "redis".
controls how the Refinery cluster communicates between peers when using
Redis. Does not apply when `PeerManagement.Type` is "file".
fields:
- name: Host
Expand Down Expand Up @@ -968,6 +969,7 @@ groups:
default: "refinery"
example: "customPrefix"
reload: false
lastversion: v2.6
validations:
- type: notempty
summary: is a string used as a prefix for the keys in Redis while
Expand All @@ -985,6 +987,7 @@ groups:
default: 0
example: 1
reload: false
lastversion: v2.6
validations:
- type: minimum
arg: 0
Expand Down
25 changes: 13 additions & 12 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ package peer

import "github.com/honeycombio/refinery/config"

type filePeers struct {
c config.Config
type FilePeers struct {
Cfg config.Config `inject:""`
}

// NewFilePeers returns a peers collection backed by the config file
func newFilePeers(c config.Config) Peers {
return &filePeers{
c: c,
}
}

func (p *filePeers) GetPeers() ([]string, error) {
func (p *FilePeers) GetPeers() ([]string, error) {
// we never want to return an empty list of peers, so if the config
// returns an empty list, return a single peer. This keeps the sharding
// logic happy.
peers, err := p.c.GetPeers()
peers, err := p.Cfg.GetPeers()
if len(peers) == 0 {
peers = []string{"http://127.0.0.1:8081"}
}
return peers, err
}

func (p *filePeers) RegisterUpdatedPeersCallback(callback func()) {
func (p *FilePeers) RegisterUpdatedPeersCallback(callback func()) {
// whenever registered, call the callback immediately
// otherwise do nothing since they never change
callback()
}

func (p *FilePeers) Start() error {
return nil
}

func (p *FilePeers) Stop() error {
return nil
}
8 changes: 6 additions & 2 deletions internal/peer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ func TestFilePeers(t *testing.T) {
peers := []string{"peer"}

c := &config.MockConfig{
GetPeersVal: peers,
PeerManagementType: "file",
GetPeersVal: peers,
}
p, err := newPeers(c)
if err != nil {
t.Error(err)
}
p := newFilePeers(c)

if d, _ := p.GetPeers(); !(len(d) == 1 && d[0] == "peer") {
t.Error("received", d, "expected", "[peer]")
Expand Down
9 changes: 9 additions & 0 deletions internal/peer/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ type MockPeers struct {
func (p *MockPeers) GetPeers() ([]string, error) {
return p.Peers, nil
}

func (p *MockPeers) RegisterUpdatedPeersCallback(callback func()) {
callback()
}

func (p *MockPeers) Start() error {
return nil
}

func (p *MockPeers) Stop() error {
return nil
}
26 changes: 4 additions & 22 deletions internal/peer/peers.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
package peer

import (
"context"
"errors"

"github.com/honeycombio/refinery/config"
"github.com/facebookgo/startstop"
)

// Peers holds the collection of peers for the cluster
type Peers interface {
GetPeers() ([]string, error)

RegisterUpdatedPeersCallback(callback func())
}

func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) {
t, err := c.GetPeerManagementType()

if err != nil {
return nil, err
}

switch t {
case "file":
return newFilePeers(c), nil
case "redis":
return newRedisPeers(ctx, c, done)
default:
return nil, errors.New("invalid config option 'PeerManagement.Type'")
}
// make it injectable
startstop.Starter
startstop.Stopper
}
Loading

0 comments on commit f8d7c17

Please sign in to comment.