Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peer management on pubsub via callbacks #1220

Merged
merged 11 commits into from
Jul 3, 2024
39 changes: 9 additions & 30 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,6 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) {
}
}

type testPeers struct {
peers []string
}

func (p *testPeers) GetPeers() ([]string, error) {
return p.peers, nil
}

func (p *testPeers) RegisterUpdatedPeersCallback(callback func()) {
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
Expand Down Expand Up @@ -126,8 +115,7 @@ func newStartedApp(

var err error
if peers == nil {
peers, err = peer.NewPeers(context.Background(), c, make(chan struct{}))
assert.NoError(t, err)
peers = &peer.FilePeers{Cfg: c}
}

a := App{}
Expand Down Expand Up @@ -333,24 +321,21 @@ func TestPeerRouting(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:11001",
"http://localhost:11003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 11000 + (i * 2)
senders[i] = &transmission.MockSender{}
apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
Expand Down Expand Up @@ -466,24 +451,21 @@ func TestHostMetadataSpanAdditions(t *testing.T) {
func TestEventsEndpoint(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:13001",
"http://localhost:13003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 13000 + (i * 2)
senders[i] = &transmission.MockSender{}
apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
Expand Down Expand Up @@ -582,15 +564,14 @@ func TestEventsEndpoint(t *testing.T) {
func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:15001",
"http://localhost:15003",
},
}

var apps [2]*App
var addrs [2]string
var senders [2]*transmission.MockSender
for i := range apps {
basePort := 15000 + (i * 2)
Expand All @@ -600,8 +581,6 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
apps[i] = app
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
}

// this traceID was chosen because it hashes to the appropriate shard for this
Expand Down Expand Up @@ -845,8 +824,8 @@ func BenchmarkDistributedTraces(b *testing.B) {
},
}

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:12001",
"http://localhost:12003",
"http://localhost:12005",
Expand Down
36 changes: 28 additions & 8 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -29,6 +28,7 @@ import (
"github.com/honeycombio/refinery/internal/peer"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/service/debug"
"github.com/honeycombio/refinery/sharder"
Expand Down Expand Up @@ -113,14 +113,32 @@ func main() {
os.Exit(1)
}

ctx, cancel := context.WithTimeout(context.Background(), c.GetPeerTimeout())
defer cancel()
done := make(chan struct{})
peers, err := peer.NewPeers(ctx, c, done)

var peers peer.Peers
var pubsubber pubsub.PubSub
ptype, err := c.GetPeerManagementType()
if err != nil {
fmt.Printf("unable to load peers: %+v\n", err)
os.Exit(1)
panic(err)
}
switch ptype {
case "file":
peers = &peer.FilePeers{Cfg: c}
// we know FilePeers doesn't need to be Started, so we can ask it how many peers we have.
// if we only have one, we can use the local pubsub implementation.
peerList, err := peers.GetPeers()
if err != nil {
panic(err)
}
if len(peerList) == 1 {
pubsubber = &pubsub.LocalPubSub{}
} else {
pubsubber = &pubsub.GoRedisPubSub{}
}
case "redis":
pubsubber = &pubsub.GoRedisPubSub{}
peers = &peer.RedisPubsubPeers{}
default:
// this should have been caught by validation
panic("invalid config option 'PeerManagement.Type'")
}

// upstreamTransport is the http transport used to send things on to Honeycomb
Expand Down Expand Up @@ -182,6 +200,7 @@ func main() {
os.Exit(1)
}

done := make(chan struct{})
stressRelief := &collect.StressRelief{Done: done}
upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream")
peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer")
Expand Down Expand Up @@ -221,6 +240,7 @@ func main() {
objects := []*inject.Object{
{Value: c},
{Value: peers},
{Value: pubsubber},
{Value: lgr},
{Value: upstreamTransport, Name: "upstreamTransport"},
{Value: peerTransport, Name: "peerTransport"},
Expand Down
17 changes: 10 additions & 7 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,13 @@ groups:
description: >
Peer management is the mechanism by which Refinery locates its peers.

`file` means that Refinery gets its peer list from
the Peers list in this config file.
`file` means that Refinery gets its peer list from the Peers list in
this config file.

`redis` means that Refinery self-registers with a Redis instance and
gets its peer list from there.
`redis` means that Refinery uses a Publish/Subscribe mechanism,
implemented on Redis, to propagate peer lists much more quickly than
the legacy mechanism. This is the recommended setting, especially for
new installations.

- name: Identifier
v1group: PeerManagement
Expand Down Expand Up @@ -902,9 +904,8 @@ groups:
- name: RedisPeerManagement
title: "Redis Peer Management"
description: >
controls how the Refinery cluster communicates
between peers when using Redis. Only applies when `PeerManagement.Type`
is "redis".
controls how the Refinery cluster communicates between peers when using
Redis. Does not apply when `PeerManagement.Type` is "file".

fields:
- name: Host
Expand Down Expand Up @@ -968,6 +969,7 @@ groups:
default: "refinery"
example: "customPrefix"
reload: false
lastversion: v2.6
validations:
- type: notempty
summary: is a string used as a prefix for the keys in Redis while
Expand All @@ -985,6 +987,7 @@ groups:
default: 0
example: 1
reload: false
lastversion: v2.6
validations:
- type: minimum
arg: 0
Expand Down
25 changes: 13 additions & 12 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@ package peer

import "github.com/honeycombio/refinery/config"

type filePeers struct {
c config.Config
type FilePeers struct {
Cfg config.Config `inject:""`
}

// NewFilePeers returns a peers collection backed by the config file
func newFilePeers(c config.Config) Peers {
return &filePeers{
c: c,
}
}

func (p *filePeers) GetPeers() ([]string, error) {
func (p *FilePeers) GetPeers() ([]string, error) {
// we never want to return an empty list of peers, so if the config
// returns an empty list, return a single peer. This keeps the sharding
// logic happy.
peers, err := p.c.GetPeers()
peers, err := p.Cfg.GetPeers()
if len(peers) == 0 {
peers = []string{"http://127.0.0.1:8081"}
}
return peers, err
}

func (p *filePeers) RegisterUpdatedPeersCallback(callback func()) {
func (p *FilePeers) RegisterUpdatedPeersCallback(callback func()) {
// whenever registered, call the callback immediately
// otherwise do nothing since they never change
callback()
}

func (p *FilePeers) Start() error {
return nil
}

func (p *FilePeers) Stop() error {
return nil
}
8 changes: 6 additions & 2 deletions internal/peer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ func TestFilePeers(t *testing.T) {
peers := []string{"peer"}

c := &config.MockConfig{
GetPeersVal: peers,
PeerManagementType: "file",
GetPeersVal: peers,
}
p, err := newPeers(c)
if err != nil {
t.Error(err)
}
p := newFilePeers(c)

if d, _ := p.GetPeers(); !(len(d) == 1 && d[0] == "peer") {
t.Error("received", d, "expected", "[peer]")
Expand Down
9 changes: 9 additions & 0 deletions internal/peer/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ type MockPeers struct {
func (p *MockPeers) GetPeers() ([]string, error) {
return p.Peers, nil
}

func (p *MockPeers) RegisterUpdatedPeersCallback(callback func()) {
callback()
}

func (p *MockPeers) Start() error {
return nil
}

func (p *MockPeers) Stop() error {
return nil
}
26 changes: 4 additions & 22 deletions internal/peer/peers.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
package peer

import (
"context"
"errors"

"github.com/honeycombio/refinery/config"
"github.com/facebookgo/startstop"
)

// Peers holds the collection of peers for the cluster
type Peers interface {
GetPeers() ([]string, error)

RegisterUpdatedPeersCallback(callback func())
}

func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) {
t, err := c.GetPeerManagementType()

if err != nil {
return nil, err
}

switch t {
case "file":
return newFilePeers(c), nil
case "redis":
return newRedisPeers(ctx, c, done)
default:
return nil, errors.New("invalid config option 'PeerManagement.Type'")
}
// make it injectable
startstop.Starter
startstop.Stopper
}
Loading
Loading