Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On shutdown, remove ourself from the peers list #569

Merged
merged 1 commit into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions app/app_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build all || race
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need these build flags any more so I removed them.


package app

import (
Expand Down Expand Up @@ -123,7 +121,7 @@ func newStartedApp(

var err error
if peers == nil {
peers, err = peer.NewPeers(context.Background(), c)
peers, err = peer.NewPeers(context.Background(), c, make(chan struct{}))
assert.NoError(t, err)
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ func main() {

ctx, cancel := context.WithTimeout(context.Background(), c.GetPeerTimeout())
defer cancel()
peers, err := peer.NewPeers(ctx, c)
done := make(chan struct{})
peers, err := peer.NewPeers(ctx, c, done)

if err != nil {
fmt.Printf("unable to load peers: %+v\n", err)
Expand Down Expand Up @@ -226,5 +227,8 @@ func main() {

// block on our signal handler to exit
sig := <-sigsToExit
// unregister ourselves before we go
close(done)
time.Sleep(100 * time.Millisecond)
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
a.Logger.Error().Logf("Caught signal \"%s\"", sig)
}
2 changes: 0 additions & 2 deletions config/config_test_reload_error_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build all || !race

package config

import (
Expand Down
5 changes: 3 additions & 2 deletions internal/peer/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package peer
import (
"context"
"errors"

"github.com/honeycombio/refinery/config"
)

Expand All @@ -13,7 +14,7 @@ type Peers interface {
RegisterUpdatedPeersCallback(callback func())
}

func NewPeers(ctx context.Context, c config.Config) (Peers, error) {
func NewPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) {
t, err := c.GetPeerManagementType()

if err != nil {
Expand All @@ -24,7 +25,7 @@ func NewPeers(ctx context.Context, c config.Config) (Peers, error) {
case "file":
return newFilePeers(c), nil
case "redis":
return newRedisPeers(ctx, c)
return newRedisPeers(ctx, c, done)
default:
return nil, errors.New("invalid config option 'PeerManagement.Type'")
}
Expand Down
35 changes: 33 additions & 2 deletions internal/peer/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peer

import (
"context"
"strings"
"testing"
"time"

Expand All @@ -16,7 +17,9 @@ func TestNewPeers(t *testing.T) {
PeerTimeout: 5 * time.Second,
}

p, err := NewPeers(context.Background(), c)
done := make(chan struct{})
defer close(done)
p, err := NewPeers(context.Background(), c, done)
assert.NoError(t, err)
require.NotNil(t, p)

Expand All @@ -32,7 +35,7 @@ func TestNewPeers(t *testing.T) {
PeerTimeout: 5 * time.Second,
}

p, err = NewPeers(context.Background(), c)
p, err = NewPeers(context.Background(), c, done)
assert.NoError(t, err)
require.NotNil(t, p)

Expand All @@ -42,3 +45,31 @@ func TestNewPeers(t *testing.T) {
t.Errorf("received %T expected %T", i, &redisPeers{})
}
}

func TestPeerShutdown(t *testing.T) {
c := &config.MockConfig{
GetPeerListenAddrVal: "0.0.0.0:8081",
PeerManagementType: "redis",
PeerTimeout: 5 * time.Second,
}

done := make(chan struct{})
p, err := NewPeers(context.Background(), c, done)
assert.NoError(t, err)
require.NotNil(t, p)

peer, ok := p.(*redisPeers)
assert.True(t, ok)

peers, err := peer.GetPeers()
assert.NoError(t, err)
assert.Equal(t, 1, len(peers))
assert.True(t, strings.HasPrefix(peers[0], "http"))
assert.True(t, strings.HasSuffix(peers[0], "8081"))

close(done)
time.Sleep(100 * time.Millisecond)
peers, err = peer.GetPeers()
assert.NoError(t, err)
assert.Equal(t, 0, len(peers))
}
89 changes: 53 additions & 36 deletions internal/peer/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type redisPeers struct {
}

// NewRedisPeers returns a peers collection backed by redis
func newRedisPeers(ctx context.Context, c config.Config) (Peers, error) {
func newRedisPeers(ctx context.Context, c config.Config, done chan struct{}) (Peers, error) {
redisHost, _ := c.GetRedisHost()

if redisHost == "" {
Expand Down Expand Up @@ -108,15 +108,15 @@ func newRedisPeers(ctx context.Context, c config.Config) (Peers, error) {
}

// go establish a regular registration heartbeat to ensure I stay alive in redis
go peers.registerSelf()
go peers.registerSelf(done)

// get our peer list once to seed ourselves
peers.updatePeerListOnce()

// go watch the list of peers and trigger callbacks whenever it changes.
// populate my local list of peers so each request can hit memory and only hit
// redis on a ticker
go peers.watchPeers()
go peers.watchPeers(done)

return peers, nil
}
Expand All @@ -135,15 +135,24 @@ func (p *redisPeers) RegisterUpdatedPeersCallback(cb func()) {

// registerSelf inserts self into the peer list and updates self's entry on a
// regular basis so it doesn't time out and get removed from the list of peers.
// If this function stops, this host will get ejected from other's peer lists.
func (p *redisPeers) registerSelf() {
// When this function stops, it tries to remove the registered key.
func (p *redisPeers) registerSelf(done chan struct{}) {
tk := time.NewTicker(refreshCacheInterval)
for range tk.C {
ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout())
// every 5 seconds, insert a 30sec timeout record. we ignore the error
// here since Register() logs the error for us.
p.store.Register(ctx, p.publicAddr, peerEntryTimeout)
cancel()
for {
select {
case <-tk.C:
ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout())
// every interval, insert a timeout record. we ignore the error
// here since Register() logs the error for us.
p.store.Register(ctx, p.publicAddr, peerEntryTimeout)
cancel()
case <-done:
// unregister ourselves
ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout())
p.store.Unregister(ctx, p.publicAddr)
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
cancel()
return
}
}
}

Expand All @@ -168,38 +177,46 @@ func (p *redisPeers) updatePeerListOnce() {
p.peerLock.Unlock()
}

func (p *redisPeers) watchPeers() {
func (p *redisPeers) watchPeers(done chan struct{}) {
oldPeerList := p.peers
sort.Strings(oldPeerList)
tk := time.NewTicker(refreshCacheInterval)

for range tk.C {
ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout())
currentPeers, err := p.store.GetMembers(ctx)
cancel()

if err != nil {
logrus.WithError(err).
WithFields(logrus.Fields{
"name": p.publicAddr,
"timeout": p.c.GetPeerTimeout().String(),
"oldPeers": oldPeerList,
}).
Error("get members failed during watch")
continue
}
for {
select {
case <-tk.C:
ctx, cancel := context.WithTimeout(context.Background(), p.c.GetPeerTimeout())
currentPeers, err := p.store.GetMembers(ctx)
cancel()

if err != nil {
logrus.WithError(err).
WithFields(logrus.Fields{
"name": p.publicAddr,
"timeout": p.c.GetPeerTimeout().String(),
"oldPeers": oldPeerList,
}).
Error("get members failed during watch")
continue
}

sort.Strings(currentPeers)
if !equal(oldPeerList, currentPeers) {
// update peer list and trigger callbacks saying the peer list has changed
sort.Strings(currentPeers)
if !equal(oldPeerList, currentPeers) {
// update peer list and trigger callbacks saying the peer list has changed
p.peerLock.Lock()
p.peers = currentPeers
oldPeerList = currentPeers
p.peerLock.Unlock()
for _, callback := range p.callbacks {
// don't block on any of the callbacks.
go callback()
}
}
case <-done:
p.peerLock.Lock()
p.peers = currentPeers
oldPeerList = currentPeers
p.peers = []string{}
p.peerLock.Unlock()
for _, callback := range p.callbacks {
// don't block on any of the callbacks.
go callback()
}
return
}
}
}
Expand Down
31 changes: 27 additions & 4 deletions internal/redimem/redimem.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Membership interface {
// in order to remain a member of the group.
Register(ctx context.Context, memberName string, timeout time.Duration) error

// Unregister removes a name from the list immediately. It's intended to be
// used during shutdown so that there's no delay in the case of deliberate downsizing.
Unregister(ctx context.Context, memberName string) error

// GetMembers retrieves the list of all currently registered members. Members
// that have registered but timed out will not be returned.
GetMembers(ctx context.Context) ([]string, error)
Expand Down Expand Up @@ -87,6 +91,27 @@ func (rm *RedisMembership) Register(ctx context.Context, memberName string, time
return nil
}

func (rm *RedisMembership) Unregister(ctx context.Context, memberName string) error {
err := rm.validateDefaults()
if err != nil {
return err
}
key := fmt.Sprintf("%s•%s•%s", globalPrefix, rm.Prefix, memberName)
conn, err := rm.Pool.GetContext(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Do("DEL", key)
if err != nil {
logrus.WithField("name", memberName).
WithField("err", err).
Error("unregistration failed")
return err
}
return nil
}

// GetMembers reaches out to Redis to retrieve a list of all members in the
// cluster. It does this multiple times (how many is configured on
// initializition) and takes the union of the results returned.
Expand Down Expand Up @@ -189,10 +214,8 @@ func (rm *RedisMembership) scan(conn redis.Conn, pattern, count string, timeout
break
}

if keys != nil {
for _, key := range keys {
keyChan <- key
}
for _, key := range keys {
keyChan <- key
}

// redis will return 0 when we have iterated over the entire set
Expand Down
53 changes: 53 additions & 0 deletions sample/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,59 @@ func TestRules(t *testing.T) {
ExpectedKeep: true,
ExpectedRate: 1,
},
{
Rules: &config.RulesBasedSamplerConfig{
Rule: []*config.RulesBasedSamplerRule{
{
Name: "Check root span for span count",
Drop: true,
SampleRate: 0,
Condition: []*config.RulesBasedSamplerCondition{
{
Field: "meta.span_count",
Operator: ">=",
Value: int(2),
},
},
},
},
},
Spans: []*types.Span{
{
Event: types.Event{
Data: map[string]interface{}{
"trace.trace_id": "12345",
"trace.span_id": "54321",
"meta.span_count": int64(2),
"test": int64(2),
},
},
},
{
Event: types.Event{
Data: map[string]interface{}{
"trace.trace_id": "12345",
"trace.span_id": "654321",
"trace.parent_id": "54321",
"test": int64(2),
},
},
},
{
Event: types.Event{
Data: map[string]interface{}{
"trace.trace_id": "12345",
"trace.span_id": "754321",
"trace.parent_id": "54321",
"test": int64(3),
},
},
},
},
ExpectedName: "Check root span for span count",
ExpectedKeep: false,
ExpectedRate: 0,
},
}

for _, d := range data {
Expand Down
8 changes: 6 additions & 2 deletions sharder/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ func TestWhichShard(t *testing.T) {
GetPeersVal: peers,
PeerManagementType: "file",
}
filePeers, err := peer.NewPeers(context.Background(), config)
done := make(chan struct{})
defer close(done)
filePeers, err := peer.NewPeers(context.Background(), config, done)
assert.Equal(t, nil, err)
sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -67,7 +69,9 @@ func TestWhichShardAtEdge(t *testing.T) {
GetPeersVal: peers,
PeerManagementType: "file",
}
filePeers, err := peer.NewPeers(context.Background(), config)
done := make(chan struct{})
defer close(done)
filePeers, err := peer.NewPeers(context.Background(), config, done)
assert.Equal(t, nil, err)
sharder := DeterministicSharder{
Config: config,
Expand Down