Skip to content

Commit

Permalink
Merge pull request nsqio#33 from mreiferson/granular_locks_33
Browse files Browse the repository at this point in the history
reader: more granular connection locks
  • Loading branch information
jehiah committed May 13, 2014
2 parents aba5a64 + 357752c commit 269271a
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ func NewReader(topic string, channel string) (*Reader, error) {
return q, nil
}

func (q *Reader) conns() []*Conn {
q.RLock()
conns := make([]*Conn, 0, len(q.connections))
for _, c := range q.connections {
conns = append(conns, c)
}
q.RUnlock()
return conns
}


// ConnectionMaxInFlight calculates the per-connection max-in-flight count.
//
// This may change dynamically based on the number of connections to nsqd the Reader
Expand Down Expand Up @@ -238,9 +249,7 @@ func (q *Reader) SetMaxInFlight(maxInFlight int) {
q.maxInFlight = maxInFlight
q.maxInFlightMutex.Unlock()

q.RLock()
defer q.RUnlock()
for _, c := range q.connections {
for _, c := range q.conns() {
q.rdyChan <- c
}
}
Expand Down Expand Up @@ -501,11 +510,9 @@ func (q *Reader) ConnectToNSQ(addr string) error {
q.Unlock()

// pre-emptive signal to existing connections to lower their RDY count
q.RLock()
for _, c := range q.connections {
for _, c := range q.conns() {
q.rdyChan <- c
}
q.RUnlock()

return nil
}
Expand Down Expand Up @@ -727,14 +734,12 @@ func (q *Reader) rdyLoop() {
// exit backoff
if backoffCounter == 0 && backoffUpdated {
count := q.ConnectionMaxInFlight()
q.RLock()
for _, c := range q.connections {
for _, c := range q.conns() {
if q.VerboseLogging {
log.Printf("[%s] exiting backoff. returning to RDY %d", c, count)
}
q.updateRDY(c, count)
}
q.RUnlock()
continue
}

Expand All @@ -749,14 +754,12 @@ func (q *Reader) rdyLoop() {
backoffDuration.Seconds(), backoffCounter)

// send RDY 0 immediately (to *all* connections)
q.RLock()
for _, c := range q.connections {
for _, c := range q.conns() {
if q.VerboseLogging {
log.Printf("[%s] in backoff. sending RDY 0", c)
}
q.updateRDY(c, 0)
}
q.RUnlock()
}
case <-redistributeTicker.C:
q.redistributeRDY()
Expand Down Expand Up @@ -857,9 +860,9 @@ func (q *Reader) redistributeRDY() {
return
}

q.RLock()
possibleConns := make([]*Conn, 0, len(q.connections))
for _, c := range q.connections {
conns := q.conns()
possibleConns := make([]*Conn, 0, len(conns))
for _, c := range conns {
lastMsgDuration := time.Now().Sub(c.LastMessageTime())
rdyCount := c.RDY()
if q.VerboseLogging {
Expand Down Expand Up @@ -887,7 +890,6 @@ func (q *Reader) redistributeRDY() {
log.Printf("[%s] redistributing RDY", c)
q.updateRDY(c, 1)
}
q.RUnlock()
}

// Stop will gracefully stop the Reader
Expand All @@ -905,15 +907,13 @@ func (q *Reader) Stop() {
if l == 0 {
q.stopHandlers()
} else {
q.RLock()
for _, c := range q.connections {
for _, c := range q.conns() {
err := c.SendCommand(StartClose())
if err != nil {
log.Printf("[%s] failed to start close - %s", c, err.Error())
c.Stop()
}
}
q.RUnlock()

time.AfterFunc(time.Second*30, func() {
q.stopHandlers()
Expand Down

0 comments on commit 269271a

Please sign in to comment.