Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unregister peer asap on shutdown #1260

Merged
merged 5 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func main() {
os.Exit(1)
}

// when refinery receives a shutdown signal, we need to
// immediately let its peers know so they can stop sending
// data to it.
done := make(chan struct{})
// set up the peer management and pubsub implementations
var peers peer.Peers
var pubsubber pubsub.PubSub
Expand All @@ -125,11 +129,11 @@ func main() {
// In the case of file peers, we do not use Redis for anything, including pubsub, so
// we use the local pubsub implementation. Even if we have multiple peers, these
// peers cannot communicate using pubsub.
peers = &peer.FilePeers{}
peers = &peer.FilePeers{Done: done}
pubsubber = &pubsub.LocalPubSub{}
case "redis":
// if we're using redis, we need to set it up for both peers and pubsub
peers = &peer.RedisPubsubPeers{}
peers = &peer.RedisPubsubPeers{Done: done}
pubsubber = &pubsub.GoRedisPubSub{}
default:
// this should have been caught by validation
Expand Down Expand Up @@ -195,7 +199,6 @@ func main() {
os.Exit(1)
}

done := make(chan struct{})
stressRelief := &collect.StressRelief{Done: done}
upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream")
peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer")
Expand Down
4 changes: 3 additions & 1 deletion collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/dgryski/go-wyhash"
"github.com/facebookgo/startstop"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/health"
"github.com/honeycombio/refinery/internal/peer"
Expand All @@ -22,12 +23,13 @@ import (
const stressReliefTopic = "refinery-stress-relief"

type StressReliever interface {
Start() error
UpdateFromConfig(cfg config.StressReliefConfig)
Recalc() uint
Stressed() bool
GetSampleRate(traceID string) (rate uint, keep bool, reason string)
ShouldSampleDeterministically(traceID string) bool

startstop.Starter
}

var _ StressReliever = &MockStressReliever{}
Expand Down
1 change: 0 additions & 1 deletion collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func newStressRelief(t *testing.T, clock clockwork.Clock, channel pubsub.PubSub)

return sr, func() {
require.NoError(t, healthReporter.Stop())
require.NoError(t, peer.Stop())
require.NoError(t, channel.Stop())
}
}
7 changes: 2 additions & 5 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"github.com/honeycombio/refinery/metrics"
)

var _ Peers = &FilePeers{}
var _ Peers = (*FilePeers)(nil)

type FilePeers struct {
Cfg config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Done chan struct{}

id string
}
Expand Down Expand Up @@ -55,10 +56,6 @@ func (p *FilePeers) Start() (err error) {
return nil
}

func (p *FilePeers) Stop() error {
return nil
}

func (p *FilePeers) publicAddr() (string, error) {
addr := p.Cfg.GetPeerListenAddr()
host, port, err := net.SplitHostPort(addr)
Expand Down
4 changes: 1 addition & 3 deletions internal/peer/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@ func (p *MockPeers) Start() error {
return nil
}

func (p *MockPeers) Stop() error {
return nil
}
func (p *MockPeers) stop() {}
1 change: 0 additions & 1 deletion internal/peer/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ type Peers interface {
RegisterUpdatedPeersCallback(callback func())
// make it injectable
startstop.Starter
startstop.Stopper
}
23 changes: 16 additions & 7 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,26 @@ func (p *peerCommand) marshal() string {
return string(p.action) + p.peer
}

var _ Peers = (*RedisPubsubPeers)(nil)

type RedisPubsubPeers struct {
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Logger logger.Logger `inject:""`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`

// Done is a channel that will be closed when the service should stop.
// After it is closed, peers service should signal the rest of the cluster
// that it is no longer available.
// However, any messages send on the peers channel will still be processed
// since the pubsub subscription is still active.
Done chan struct{}

peers *generics.SetWithTTL[string]
hash uint64
callbacks []func()
sub pubsub.Subscription
done chan struct{}
}

// checkHash checks the hash of the current list of peers and calls any registered callbacks
Expand Down Expand Up @@ -124,7 +132,6 @@ func (p *RedisPubsubPeers) Start() error {
p.Logger = &logger.NullLogger{}
}

p.done = make(chan struct{})
p.peers = generics.NewSetWithTTL[string](PeerEntryTimeout)
p.callbacks = make([]func(), 0)
p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen)
Expand Down Expand Up @@ -153,7 +160,8 @@ func (p *RedisPubsubPeers) Start() error {
defer logTicker.Stop()
for {
select {
case <-p.done:
case <-p.Done:
p.stop()
return
case <-ticker.Chan():
// publish our presence periodically
Expand All @@ -174,15 +182,16 @@ func (p *RedisPubsubPeers) Start() error {
return nil
}

func (p *RedisPubsubPeers) Stop() error {
// stop send a message to the pubsub channel to unregister this peer
// but it does not close the subscription.
func (p *RedisPubsubPeers) stop() {
// unregister ourselves
myaddr, err := p.publicAddr()
if err != nil {
return err
p.Logger.Error().Logf("failed to get public address")
return
}
p.PubSub.Publish(context.Background(), "peers", newPeerCommand(Unregister, myaddr).marshal())
close(p.done)
return nil
}

func (p *RedisPubsubPeers) GetPeers() ([]string, error) {
Expand Down
7 changes: 0 additions & 7 deletions sharder/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func TestWhichShard(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -81,7 +80,6 @@ func TestWhichShardAtEdge(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -136,7 +134,6 @@ func BenchmarkShardBulk(b *testing.B) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(b, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -186,7 +183,6 @@ func TestShardBulk(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -262,7 +258,6 @@ func TestShardDrop(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -349,7 +344,6 @@ func TestShardAddHash(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -435,7 +429,6 @@ func BenchmarkDeterministicShard(b *testing.B) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(b, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down
Loading