Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publish refinery instance ID during pubsub peer comms #1417

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/facebookgo/inject"
"github.com/facebookgo/startstop"
"github.com/google/uuid"
libhoney "github.com/honeycombio/libhoney-go"
"github.com/honeycombio/libhoney-go/transmission"
"github.com/jonboulle/clockwork"
Expand All @@ -42,6 +43,9 @@ import (
var BuildID string
var version string

// instanceID is a unique identifier for this instance of refinery.
var instanceID = uuid.NewString()

type graphLogger struct {
}

Expand Down Expand Up @@ -265,6 +269,7 @@ func main() {
{Value: refineryHealth},
{Value: &configwatcher.ConfigWatcher{}},
{Value: &a},
{Value: instanceID, Name: "instanceID"},
}
err = g.Provide(objects...)
if err != nil {
Expand Down
9 changes: 2 additions & 7 deletions generics/setttl.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package generics

import (
"cmp"
"sort"
"sync"
"time"

Expand All @@ -13,15 +11,15 @@ import (
// SetWithTTL is a unique set of items with a TTL (time to live) for each item.
// After the TTL expires, the item is automatically removed from the set when either Members or Length is called.
// It is safe for concurrent use.
type SetWithTTL[T cmp.Ordered] struct {
type SetWithTTL[T comparable] struct {
Items map[T]time.Time
TTL time.Duration
Clock clockwork.Clock
mut sync.RWMutex
}

// NewSetWithTTL returns a new SetWithTTL with elements `es` and a TTL of `ttl`.
func NewSetWithTTL[T cmp.Ordered](ttl time.Duration, es ...T) *SetWithTTL[T] {
func NewSetWithTTL[T comparable](ttl time.Duration, es ...T) *SetWithTTL[T] {
s := &SetWithTTL[T]{
Items: make(map[T]time.Time, len(es)),
TTL: ttl,
Expand Down Expand Up @@ -77,9 +75,6 @@ func (s *SetWithTTL[T]) Members() []T {
s.mut.RLock()
members := maps.Keys(s.Items)
s.mut.RUnlock()
sort.Slice(members, func(i, j int) bool {
return cmp.Less(members[i], members[j])
})
return members
}

Expand Down
1 change: 1 addition & 0 deletions internal/peer/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func newPeers(c config.Config) (Peers, error) {
{Value: &metrics.NullMetrics{}, Name: "metrics"},
{Value: &logger.NullLogger{}},
{Value: clockwork.NewFakeClock()},
{Value: "testID", Name: "instanceID"},
}
err := g.Provide(objects...)
if err != nil {
Expand Down
75 changes: 49 additions & 26 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"net"
"os"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -40,43 +41,56 @@ const (
)

type peerCommand struct {
action peerAction
peer string
action peerAction
address string
id string
}

func newPeerCommand(action peerAction, peer string) *peerCommand {
func newPeerCommand(action peerAction, address string, id string) *peerCommand {
return &peerCommand{
action: action,
peer: peer,
action: action,
address: address,
id: id,
}
}

func (p *peerCommand) unmarshal(msg string) bool {
if len(msg) < 2 {
idx := strings.Index(msg, ",")
if len(msg) < 2 || idx == -1 {
return false
}
// first letter indicates the action (eg register, unregister)
p.action = peerAction(msg[:1])
p.peer = msg[1:]
switch p.action {
case Register, Unregister:
// the remainder is the peer address and ID, separated by a comma
msgData := msg[1:]
p.address = msgData[:idx-1]
p.id = msgData[idx:]
return true
default:
return false
}
}

func (p *peerCommand) marshal() string {
return string(p.action) + p.peer
return string(p.action) + p.address + "," + p.id
}

var _ Peers = (*RedisPubsubPeers)(nil)

type peerRecord struct {
id string
address string
}

type RedisPubsubPeers struct {
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Logger logger.Logger `inject:""`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Logger logger.Logger `inject:""`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`
InstanceID string `inject:"instanceID"`

// Done is a channel that will be closed when the service should stop.
// After it is closed, peers service should signal the rest of the cluster
Expand All @@ -85,7 +99,7 @@ type RedisPubsubPeers struct {
// since the pubsub subscription is still active.
Done chan struct{}

peers *generics.SetWithTTL[string]
peers *generics.SetWithTTL[peerRecord]
hash uint64
callbacks []func()
sub pubsub.Subscription
Expand Down Expand Up @@ -113,9 +127,9 @@ func (p *RedisPubsubPeers) listen(ctx context.Context, msg string) {
p.Metrics.Count("peer_messages", 1)
switch cmd.action {
case Unregister:
p.peers.Remove(cmd.peer)
p.peers.Remove(peerRecord{id: cmd.id, address: cmd.address})
case Register:
p.peers.Add(cmd.peer)
p.peers.Add(peerRecord{id: cmd.id, address: cmd.address})
}
p.checkHash()
}
Expand All @@ -138,7 +152,7 @@ func (p *RedisPubsubPeers) Start() error {
p.Logger = &logger.NullLogger{}
}

p.peers = generics.NewSetWithTTL[string](PeerEntryTimeout)
p.peers = generics.NewSetWithTTL[peerRecord](PeerEntryTimeout)
p.callbacks = make([]func(), 0)
p.Logger.Info().Logf("subscribing to pubsub peers channel")
p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen)
Expand All @@ -151,7 +165,7 @@ func (p *RedisPubsubPeers) Start() error {
if err != nil {
return err
}
p.peers.Add(myaddr)
p.peers.Add(peerRecord{id: p.InstanceID, address: myaddr})
return nil
}

Expand Down Expand Up @@ -182,7 +196,7 @@ func (p *RedisPubsubPeers) Ready() error {

// publish our presence periodically
ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout())
err := p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr).marshal())
err := p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr, p.InstanceID).marshal())
if err != nil {
p.Logger.Error().WithFields(map[string]interface{}{
"error": err,
Expand Down Expand Up @@ -214,7 +228,7 @@ func (p *RedisPubsubPeers) stop() {
return
}

err = p.PubSub.Publish(context.Background(), "peers", newPeerCommand(Unregister, myaddr).marshal())
err = p.PubSub.Publish(context.Background(), "peers", newPeerCommand(Unregister, myaddr, p.InstanceID).marshal())
if err != nil {
p.Logger.Error().WithFields(map[string]interface{}{
"error": err,
Expand All @@ -233,9 +247,14 @@ func (p *RedisPubsubPeers) GetPeers() ([]string, error) {
if err != nil {
return nil, err
}
peers = []string{myaddr}
return []string{myaddr}, nil
}

var peerAddresses []string
for _, peer := range peers {
peerAddresses = append(peerAddresses, peer.address)
}
return peers, nil
return peerAddresses, nil
}

func (p *RedisPubsubPeers) GetInstanceID() (string, error) {
Expand Down Expand Up @@ -324,11 +343,15 @@ func (p *RedisPubsubPeers) getIdentifierFromInterface() (string, error) {
return myIdentifier, nil
}

// hashList hashes a list of strings into a single uint64
func hashList(list []string) uint64 {
// hashList hashes a list of peerRecord's into a single uint64
func hashList(list []peerRecord) uint64 {
var h uint64 = 255798297204 // arbitrary seed
for _, s := range list {
h = wyhash.Hash([]byte(s), h)
// sort the list so we get a consistent hash
sort.Slice(list, func(i, j int) bool {
return list[i].id < list[j].id
})
for _, peer := range list {
h = wyhash.Hash([]byte(peer.id), h)
}
return h
}
12 changes: 7 additions & 5 deletions internal/peer/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ func Test_publicAddr(t *testing.T) {
}

func TestPeerActions(t *testing.T) {
cmd := newPeerCommand(Register, "foo")
assert.Equal(t, "Rfoo", cmd.marshal())
assert.Equal(t, "foo", cmd.peer)
cmd := newPeerCommand(Register, "foo", "id1")
assert.Equal(t, "Rfoo,id1", cmd.marshal())
assert.Equal(t, "foo", cmd.address)
assert.Equal(t, Register, cmd.action)
assert.Equal(t, "id1", cmd.id)
cmd2 := peerCommand{}
b := cmd2.unmarshal("Ubar")
b := cmd2.unmarshal("Ubar,id2")
assert.True(t, b)
assert.Equal(t, "bar", cmd2.peer)
assert.Equal(t, "bar", cmd2.address)
assert.Equal(t, Unregister, cmd2.action)
assert.Equal(t, "id2", cmd2.id)

b = cmd2.unmarshal("invalid")
assert.False(t, b)
Expand Down