Skip to content

Commit

Permalink
fix(deadlock): block new client during drain
Browse files Browse the repository at this point in the history
  • Loading branch information
Youen committed Sep 24, 2021
1 parent 7d962e8 commit 0c81ac4
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions pkg/balancer/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,7 @@ func (b *Balancer) Start() {
}

func (b *Balancer) getStream() chan []byte {
log.Debug().Msg("RLock")

b.RLock()
defer b.RUnlock()

log.Debug().Msg("Get Current stream")
log.Debug().Msg("RUnLock")

return b.stream
}
Expand Down Expand Up @@ -333,10 +327,15 @@ func NewProducerHandler(balancer *Balancer, events chan<- Event, quit <-chan str
}

func (ph *ProducerHandler) handleRequest(conn net.Conn) {
log.Debug().Msg("RLock")
ph.balancer.RLock()

log.Info().Str("remote", conn.RemoteAddr().String()).Msg("New producer")
ph.events <- NewProducer

worker := ProducerWorker{conn: conn, stream: ph.balancer.getStream(), events: ph.events, quit: ph.quit}
ph.balancer.RUnlock()
log.Debug().Msg("RUnLock")
worker.Start()
}

Expand Down Expand Up @@ -386,10 +385,14 @@ func NewConsumerHandler(balancer *Balancer, events chan<- Event, quit <-chan str
}

func (ch *ConsumerHandler) handleRequest(conn net.Conn) {
log.Debug().Msg("RLock")
ch.balancer.RLock()

log.Info().Str("remote", conn.RemoteAddr().String()).Msg("New consumer")
ch.events <- NewConsumer

worker := ConsumerWorker{conn: conn, stream: ch.balancer.getStream(), events: ch.events, quit: ch.quit}

ch.balancer.RUnlock()
log.Debug().Msg("RUnLock")
worker.Start()
}

0 comments on commit 0c81ac4

Please sign in to comment.