Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peer management on pubsub via callbacks #1220

Merged
merged 11 commits into from
Jul 3, 2024
27 changes: 8 additions & 19 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,6 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) {
}
}

type testPeers struct {
peers []string
}

func (p *testPeers) GetPeers() ([]string, error) {
return p.peers, nil
}

func (p *testPeers) RegisterUpdatedPeersCallback(callback func()) {
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
Expand Down Expand Up @@ -331,8 +320,8 @@ func TestPeerRouting(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:11001",
"http://localhost:11003",
},
Expand Down Expand Up @@ -464,8 +453,8 @@ func TestHostMetadataSpanAdditions(t *testing.T) {
func TestEventsEndpoint(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:13001",
"http://localhost:13003",
},
Expand Down Expand Up @@ -580,8 +569,8 @@ func TestEventsEndpoint(t *testing.T) {
func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
t.Parallel()

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:15001",
"http://localhost:15003",
},
Expand Down Expand Up @@ -843,8 +832,8 @@ func BenchmarkDistributedTraces(b *testing.B) {
},
}

peers := &testPeers{
peers: []string{
peers := &peer.MockPeers{
Peers: []string{
"http://localhost:12001",
"http://localhost:12003",
"http://localhost:12005",
Expand Down
20 changes: 14 additions & 6 deletions config/metadata/configMeta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ groups:
v1name: Type
type: string
valuetype: choice
choices: ["redis", "file"]
choices: ["pubsub", "redis", "file"]
default: "file"
reload: false
validations:
Expand All @@ -834,8 +834,13 @@ groups:
`file` means that Refinery gets its peer list from
the Peers list in this config file.

`redis` means that Refinery self-registers with a Redis instance and
gets its peer list from there.
`pubsub` means that Refinery uses a Publish/Subscribe mechanism,
implemented on Redis, to propagate peer lists much more quickly than
the legacy mechanism. This is the recommended setting, especially for
new installations.

`redis` is the legacy mechanism where Refinery keeps a peer list as
data within a Redis instance.

- name: Identifier
v1group: PeerManagement
Expand Down Expand Up @@ -902,9 +907,8 @@ groups:
- name: RedisPeerManagement
title: "Redis Peer Management"
description: >
controls how the Refinery cluster communicates
between peers when using Redis. Only applies when `PeerManagement.Type`
is "redis".
controls how the Refinery cluster communicates between peers when using
Redis. Does not apply when `PeerManagement.Type` is "file".

fields:
- name: Host
Expand Down Expand Up @@ -977,6 +981,8 @@ groups:
Refinery clusters or multiple applications want to share a single
Redis instance. It may not be blank.

Only applies when using the `redis` peer management type.

- name: Database
v1group: PeerManagement
v1name: Database
Expand All @@ -997,6 +1003,8 @@ groups:
this in any situation where multiple Refinery clusters or multiple
applications want to share a single Redis instance.

Only applies when using the `redis` peer management type.

- name: UseTLS
v1group: PeerManagement
v1name: UseTLS
Expand Down
8 changes: 8 additions & 0 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ func (p *filePeers) RegisterUpdatedPeersCallback(callback func()) {
// otherwise do nothing since they never change
callback()
}

func (p *filePeers) Start() error {
return nil
}

func (p *filePeers) Stop() error {
return nil
}
9 changes: 9 additions & 0 deletions internal/peer/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ type MockPeers struct {
func (p *MockPeers) GetPeers() ([]string, error) {
return p.Peers, nil
}

func (p *MockPeers) RegisterUpdatedPeersCallback(callback func()) {
callback()
}

func (p *MockPeers) Start() error {
return nil
}

func (p *MockPeers) Stop() error {
return nil
}
7 changes: 6 additions & 1 deletion internal/peer/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"context"
"errors"

"github.com/facebookgo/startstop"
"github.com/honeycombio/refinery/config"
)

// Peers holds the collection of peers for the cluster
type Peers interface {
GetPeers() ([]string, error)

RegisterUpdatedPeersCallback(callback func())
// make it injectable
startstop.Starter
startstop.Stopper
}

func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) {
Expand All @@ -26,6 +29,8 @@ func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers,
return newFilePeers(c), nil
case "redis":
return newRedisPeers(ctx, c, done)
case "pubsub":
return newPubsubPeers(c)
default:
return nil, errors.New("invalid config option 'PeerManagement.Type'")
}
Expand Down
Loading
Loading