diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index 0f33a28d5b..24b45b3da6 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -298,6 +298,14 @@ func main() { os.Exit(1) } + // Now that all components are started, we can notify our peers that we are ready + // to receive data. + err = peers.Ready() + if err != nil { + fmt.Printf("failed to start peer management: %v\n", err) + os.Exit(1) + } + // these have to be done after the injection (of metrics) // these are the metrics that libhoney will emit; we preregister them so that they always appear libhoneyMetricsName := map[string]string{ diff --git a/internal/peer/file.go b/internal/peer/file.go index 06452bd3c2..b995183152 100644 --- a/internal/peer/file.go +++ b/internal/peer/file.go @@ -56,6 +56,10 @@ func (p *FilePeers) Start() (err error) { return nil } +func (p *FilePeers) Ready() error { + return nil +} + func (p *FilePeers) publicAddr() (string, error) { addr := p.Cfg.GetPeerListenAddr() host, port, err := net.SplitHostPort(addr) diff --git a/internal/peer/mock.go b/internal/peer/mock.go index 9cf42abdd1..ff9e3c7462 100644 --- a/internal/peer/mock.go +++ b/internal/peer/mock.go @@ -26,4 +26,8 @@ func (p *MockPeers) Start() error { return nil } +func (p *MockPeers) Ready() error { + return nil +} + func (p *MockPeers) stop() {} diff --git a/internal/peer/peers.go b/internal/peer/peers.go index 0eadee5293..86d365fabf 100644 --- a/internal/peer/peers.go +++ b/internal/peer/peers.go @@ -9,6 +9,7 @@ type Peers interface { GetPeers() ([]string, error) GetInstanceID() (string, error) RegisterUpdatedPeersCallback(callback func()) + Ready() error // make it injectable startstop.Starter } diff --git a/internal/peer/pubsub_redis.go b/internal/peer/pubsub_redis.go index 0441a1040e..9287883cb1 100644 --- a/internal/peer/pubsub_redis.go +++ b/internal/peer/pubsub_redis.go @@ -144,7 +144,15 @@ func (p *RedisPubsubPeers) Start() error { if err != nil { return err } + p.peers.Add(myaddr) + return nil +} +func (p *RedisPubsubPeers) Ready() error { + myaddr, err := p.publicAddr() + if err != nil { + return err + } // periodically refresh our presence in the list of peers, and update peers as they come in go func() { // we want our refresh cache interval to vary from peer to peer so they @@ -164,6 +172,7 @@ func (p *RedisPubsubPeers) Start() error { p.stop() return case <-ticker.Chan(): + // publish our presence periodically ctx, cancel := context.WithTimeout(context.Background(), p.Config.GetPeerTimeout()) err := p.PubSub.Publish(ctx, "peers", newPeerCommand(Register, myaddr).marshal())