diff --git a/reader.go b/reader.go index 2d44fa58b..bc0f438a4 100644 --- a/reader.go +++ b/reader.go @@ -203,26 +203,20 @@ func (q *Reader) conns() []*Conn { return conns } - // ConnectionMaxInFlight calculates the per-connection max-in-flight count. // // This may change dynamically based on the number of connections to nsqd the Reader // is responsible for. func (q *Reader) ConnectionMaxInFlight() int64 { b := float64(q.MaxInFlight()) - q.RLock() - s := b / float64(len(q.connections)) - q.RUnlock() + s := b / float64(len(q.conns())) return int64(math.Min(math.Max(1, s), b)) } // IsStarved indicates whether any connections for this reader are blocked on processing // before being able to receive more messages (ie. RDY count of 0 and not exiting) func (q *Reader) IsStarved() bool { - q.RLock() - defer q.RUnlock() - - for _, conn := range q.connections { + for _, conn := range q.conns() { threshold := int64(float64(atomic.LoadInt64(&conn.lastRdyCount)) * 0.85) inFlight := atomic.LoadInt64(&conn.messagesInFlight) if inFlight >= threshold && inFlight > 0 && atomic.LoadInt32(&conn.stopFlag) != 1 { @@ -396,14 +390,14 @@ func (q *Reader) ConnectToNSQ(addr string) error { return errors.New("no handlers") } + _, pendingOk := q.pendingConnections[addr] q.RLock() _, ok := q.connections[addr] - _, pendingOk := q.pendingConnections[addr] + q.RUnlock() + if ok || pendingOk { - q.RUnlock() return ErrAlreadyConnected } - q.RUnlock() log.Printf("[%s] connecting to nsqd", addr) @@ -504,8 +498,8 @@ func (q *Reader) ConnectToNSQ(addr string) error { conn, q.TopicName, q.ChannelName, err.Error()) } - q.Lock() delete(q.pendingConnections, addr) + q.Lock() q.connections[addr] = conn q.Unlock() @@ -664,21 +658,20 @@ func (q *Reader) rdyLoop() { backoffTimerChan = nil atomic.StoreInt64(&q.backoffDuration, 0) - q.RLock() // pick a random connection to test the waters var i int - if len(q.connections) == 0 { + conns := q.conns() + if len(conns) == 0 { continue } - idx := rand.Intn(len(q.connections)) - for _, c := range q.connections { + idx := rand.Intn(len(conns)) + for _, c := range conns { if i == idx { choice = c break } i++ } - q.RUnlock() log.Printf("[%s] backoff time expired, continuing with RDY 1...", choice) // while in backoff only ever let 1 message at a time through @@ -841,9 +834,7 @@ func (q *Reader) redistributeRDY() { return } - q.RLock() - numConns := len(q.connections) - q.RUnlock() + numConns := len(q.conns()) maxInFlight := q.MaxInFlight() if numConns > maxInFlight { log.Printf("redistributing RDY state (%d conns > %d max_in_flight)", @@ -900,11 +891,7 @@ func (q *Reader) Stop() { log.Printf("stopping reader") - q.RLock() - l := len(q.connections) - q.RUnlock() - - if l == 0 { + if len(q.conns()) == 0 { q.stopHandlers() } else { for _, c := range q.conns() {