Skip to content

Commit

Permalink
Merge pull request nsqio#34 from mreiferson/fixes_34
Browse files Browse the repository at this point in the history
more misc fixes
  • Loading branch information
Justin Hines committed May 14, 2014
2 parents 269271a + 81bb0f7 commit 8de34ea
Showing 1 changed file with 12 additions and 25 deletions.
37 changes: 12 additions & 25 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,26 +203,20 @@ func (q *Reader) conns() []*Conn {
return conns
}


// ConnectionMaxInFlight calculates the per-connection max-in-flight count.
//
// This may change dynamically based on the number of connections to nsqd the Reader
// is responsible for.
func (q *Reader) ConnectionMaxInFlight() int64 {
b := float64(q.MaxInFlight())
q.RLock()
s := b / float64(len(q.connections))
q.RUnlock()
s := b / float64(len(q.conns()))
return int64(math.Min(math.Max(1, s), b))
}

// IsStarved indicates whether any connections for this reader are blocked on processing
// before being able to receive more messages (ie. RDY count of 0 and not exiting)
func (q *Reader) IsStarved() bool {
q.RLock()
defer q.RUnlock()

for _, conn := range q.connections {
for _, conn := range q.conns() {
threshold := int64(float64(atomic.LoadInt64(&conn.lastRdyCount)) * 0.85)
inFlight := atomic.LoadInt64(&conn.messagesInFlight)
if inFlight >= threshold && inFlight > 0 && atomic.LoadInt32(&conn.stopFlag) != 1 {
Expand Down Expand Up @@ -396,14 +390,14 @@ func (q *Reader) ConnectToNSQ(addr string) error {
return errors.New("no handlers")
}

_, pendingOk := q.pendingConnections[addr]
q.RLock()
_, ok := q.connections[addr]
_, pendingOk := q.pendingConnections[addr]
q.RUnlock()

if ok || pendingOk {
q.RUnlock()
return ErrAlreadyConnected
}
q.RUnlock()

log.Printf("[%s] connecting to nsqd", addr)

Expand Down Expand Up @@ -504,8 +498,8 @@ func (q *Reader) ConnectToNSQ(addr string) error {
conn, q.TopicName, q.ChannelName, err.Error())
}

q.Lock()
delete(q.pendingConnections, addr)
q.Lock()
q.connections[addr] = conn
q.Unlock()

Expand Down Expand Up @@ -664,21 +658,20 @@ func (q *Reader) rdyLoop() {
backoffTimerChan = nil
atomic.StoreInt64(&q.backoffDuration, 0)

q.RLock()
// pick a random connection to test the waters
var i int
if len(q.connections) == 0 {
conns := q.conns()
if len(conns) == 0 {
continue
}
idx := rand.Intn(len(q.connections))
for _, c := range q.connections {
idx := rand.Intn(len(conns))
for _, c := range conns {
if i == idx {
choice = c
break
}
i++
}
q.RUnlock()

log.Printf("[%s] backoff time expired, continuing with RDY 1...", choice)
// while in backoff only ever let 1 message at a time through
Expand Down Expand Up @@ -841,9 +834,7 @@ func (q *Reader) redistributeRDY() {
return
}

q.RLock()
numConns := len(q.connections)
q.RUnlock()
numConns := len(q.conns())
maxInFlight := q.MaxInFlight()
if numConns > maxInFlight {
log.Printf("redistributing RDY state (%d conns > %d max_in_flight)",
Expand Down Expand Up @@ -900,11 +891,7 @@ func (q *Reader) Stop() {

log.Printf("stopping reader")

q.RLock()
l := len(q.connections)
q.RUnlock()

if l == 0 {
if len(q.conns()) == 0 {
q.stopHandlers()
} else {
for _, c := range q.conns() {
Expand Down

0 comments on commit 8de34ea

Please sign in to comment.