Skip to content

Commit

Permalink
fix: join peer list only after refinery is ready to accept traffic (#…
Browse files Browse the repository at this point in the history
…1309)

## Which problem is this PR solving?

Currently, Refinery announces its presence to peers immediately upon
startup. This causes the sharding algorithm to direct other peers to
forward traces to the newly added Refinery instance, even though it
might not be ready to process them. As a result, any spans forwarded
before the Refinery is fully operational are lost. This fix ensures that
Refinery only announces itself to peers once it is fully ready to accept
and process traffic, preventing any loss of spans during startup.​

Alternatives:
I have tried to use the `health` module as a signal to indicate when the
`router` is ready for the `peers` module. However, due to the way the
`injection` library works, the `router` starts after the `peers` module.
As a result, when `peers` checks in with the health system, the `router`
may not have registered itself yet. This can cause `IsReady` to return
true prematurely, before the `router` is included in the health checks.

## Short description of the changes

- Add a `Ready` method on `Peers`
- call `Peers.Ready()` once all components have started
  • Loading branch information
VinozzZ authored Sep 4, 2024
1 parent 8e6413d commit 5c27d84
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 0 deletions.
8 changes: 8 additions & 0 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ func main() {
os.Exit(1)
}

// Now that all components are started, we can notify our peers that we are ready
// to receive data.
err = peers.Ready()
if err != nil {
fmt.Printf("failed to start peer management: %v\n", err)
os.Exit(1)
}

// these have to be done after the injection (of metrics)
// these are the metrics that libhoney will emit; we preregister them so that they always appear
libhoneyMetricsName := map[string]string{
Expand Down
4 changes: 4 additions & 0 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (p *FilePeers) Start() (err error) {
return nil
}

func (p *FilePeers) Ready() error {
return nil
}

func (p *FilePeers) publicAddr() (string, error) {
addr := p.Cfg.GetPeerListenAddr()
host, port, err := net.SplitHostPort(addr)
Expand Down
4 changes: 4 additions & 0 deletions internal/peer/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ func (p *MockPeers) Start() error {
return nil
}

func (p *MockPeers) Ready() error {
return nil
}

func (p *MockPeers) stop() {}
1 change: 1 addition & 0 deletions internal/peer/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Peers interface {
GetPeers() ([]string, error)
GetInstanceID() (string, error)
RegisterUpdatedPeersCallback(callback func())
Ready() error
// make it injectable
startstop.Starter
}
9 changes: 9 additions & 0 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,15 @@ func (p *RedisPubsubPeers) Start() error {
if err != nil {
return err
}
p.peers.Add(myaddr)
return nil
}

func (p *RedisPubsubPeers) Ready() error {
myaddr, err := p.publicAddr()
if err != nil {
return err
}
// periodically refresh our presence in the list of peers, and update peers as they come in
go func() {
// we want our refresh cache interval to vary from peer to peer so they
Expand All @@ -164,6 +172,7 @@ func (p *RedisPubsubPeers) Start() error {
p.stop()
return
case <-ticker.Chan():

// publish our presence periodically
ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout())
err := p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr).marshal())
Expand Down

0 comments on commit 5c27d84

Please sign in to comment.