diff --git a/pkg/balancer/model.go b/pkg/balancer/model.go index 68152d9..3721dd5 100644 --- a/pkg/balancer/model.go +++ b/pkg/balancer/model.go @@ -244,13 +244,7 @@ func (b *Balancer) Start() { } func (b *Balancer) getStream() chan []byte { - log.Debug().Msg("RLock") - - b.RLock() - defer b.RUnlock() - log.Debug().Msg("Get Current stream") - log.Debug().Msg("RUnLock") return b.stream } @@ -333,10 +327,15 @@ func NewProducerHandler(balancer *Balancer, events chan<- Event, quit <-chan str } func (ph *ProducerHandler) handleRequest(conn net.Conn) { + log.Debug().Msg("RLock") + ph.balancer.RLock() + log.Info().Str("remote", conn.RemoteAddr().String()).Msg("New producer") ph.events <- NewProducer worker := ProducerWorker{conn: conn, stream: ph.balancer.getStream(), events: ph.events, quit: ph.quit} + ph.balancer.RUnlock() + log.Debug().Msg("RUnLock") worker.Start() } @@ -386,10 +385,14 @@ func NewConsumerHandler(balancer *Balancer, events chan<- Event, quit <-chan str } func (ch *ConsumerHandler) handleRequest(conn net.Conn) { + log.Debug().Msg("RLock") + ch.balancer.RLock() + log.Info().Str("remote", conn.RemoteAddr().String()).Msg("New consumer") ch.events <- NewConsumer worker := ConsumerWorker{conn: conn, stream: ch.balancer.getStream(), events: ch.events, quit: ch.quit} - + ch.balancer.RUnlock() + log.Debug().Msg("RUnLock") worker.Start() }