Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: questions for large number of topics/channels #577

Merged
merged 3 commits into from
May 3, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions internal/util/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package util

import (
"math/rand"
)

func UniqRands(l int, n int) []int {
set := make(map[int]struct{})
nums := make([]int, 0, l)
for {
num := rand.Intn(n)
if _, ok := set[num]; !ok {
set[num] = struct{}{}
nums = append(nums, num)
}
if len(nums) == l {
goto exit
}
}
exit:
return nums
}
128 changes: 54 additions & 74 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@ import (

"github.com/bitly/nsq/internal/pqueue"
"github.com/bitly/nsq/internal/quantile"
"github.com/bitly/nsq/internal/util"
)

// the amount of time a worker will wait when idle
const defaultWorkerWait = 100 * time.Millisecond

type Consumer interface {
UnPause()
Pause()
Expand Down Expand Up @@ -52,8 +48,8 @@ type Channel struct {
memoryMsgChan chan *Message
clientMsgChan chan *Message
exitChan chan int
waitGroup util.WaitGroupWrapper
exitFlag int32
exitMutex sync.RWMutex

// state tracking
clients map[int64]Consumer
Expand Down Expand Up @@ -116,9 +112,6 @@ func NewChannel(topicName string, channelName string, ctx *context,

go c.messagePump()

c.waitGroup.Wrap(func() { c.deferredWorker() })
c.waitGroup.Wrap(func() { c.inFlightWorker() })

c.ctx.nsqd.Notify(c)

return c
Expand Down Expand Up @@ -155,6 +148,9 @@ func (c *Channel) Close() error {
}

func (c *Channel) exit(deleted bool) error {
c.exitMutex.Lock()
defer c.exitMutex.Unlock()

if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
return errors.New("exiting")
}
Expand All @@ -178,9 +174,6 @@ func (c *Channel) exit(deleted bool) error {

close(c.exitChan)

// synchronize the close of router() and pqWorkers (2)
c.waitGroup.Wait()

if deleted {
// empty the queue (deletes the backend files, too)
c.Empty()
Expand Down Expand Up @@ -226,7 +219,7 @@ func (c *Channel) flush() error {
var msgBuf bytes.Buffer

// messagePump is responsible for closing the channel it writes to
// this will read until its closed (exited)
// this will read until it's closed (exited)
for msg := range c.clientMsgChan {
c.ctx.nsqd.logf("CHANNEL(%s): recovered buffered message from clientMsgChan", c.name)
writeMessageToBackend(&msgBuf, msg, c.backend)
Expand Down Expand Up @@ -544,11 +537,8 @@ func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
heap.Push(&c.deferredPQ, item)
}

// messagePump reads messages from either memory or backend and writes
// to the client output go channel
//
// it is also performs in-flight accounting and initiates the auto-requeue
// goroutine
// messagePump reads messages from either memory or backend and sends
// messages to clients over a go chan
func (c *Channel) messagePump() {
var msg *Message
var buf []byte
Expand Down Expand Up @@ -587,80 +577,70 @@ exit:
close(c.clientMsgChan)
}

func (c *Channel) deferredWorker() {
c.pqWorker(&c.deferredPQ, &c.deferredMutex, func(item *pqueue.Item) {
msg := item.Value.(*Message)
_, err := c.popDeferredMessage(msg.ID)
if err != nil {
return
}
c.doRequeue(msg)
})
}
func (c *Channel) processDeferredQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return false
}

func (c *Channel) inFlightWorker() {
ticker := time.NewTicker(defaultWorkerWait)
dirty := false
for {
select {
case <-ticker.C:
case <-c.exitChan:
c.deferredMutex.Lock()
item, _ := c.deferredPQ.PeekAndShift(t)
c.deferredMutex.Unlock()

if item == nil {
goto exit
}
now := time.Now().UnixNano()
for {
c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(now)
c.inFlightMutex.Unlock()

if msg == nil {
break
}
dirty = true

_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
break
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
c.doRequeue(msg)
msg := item.Value.(*Message)
_, err := c.popDeferredMessage(msg.ID)
if err != nil {
goto exit
}
c.doRequeue(msg)
}

exit:
c.ctx.nsqd.logf("CHANNEL(%s): closing ... inFlightWorker", c.name)
ticker.Stop()
return dirty
}

// generic loop (executed in a goroutine) that periodically wakes up to walk
// the priority queue and call the callback
func (c *Channel) pqWorker(pq *pqueue.PriorityQueue, mutex *sync.Mutex, callback func(item *pqueue.Item)) {
ticker := time.NewTicker(defaultWorkerWait)
func (c *Channel) processInFlightQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return false
}

dirty := false
for {
select {
case <-ticker.C:
case <-c.exitChan:
c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()

if msg == nil {
goto exit
}
now := time.Now().UnixNano()
for {
mutex.Lock()
item, _ := pq.PeekAndShift(now)
mutex.Unlock()

if item == nil {
break
}
dirty = true

callback(item)
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
c.doRequeue(msg)
}

exit:
c.ctx.nsqd.logf("CHANNEL(%s): closing ... pqueue worker", c.name)
ticker.Stop()
return dirty
}
1 change: 1 addition & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestInFlightWorker(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
opts.QueueScanRefreshInterval = 100 * time.Millisecond
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
Expand Down
134 changes: 134 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type NSQD struct {
httpsListener net.Listener
tlsConfig *tls.Config

poolSize int

idChan chan MessageID
notifyChan chan interface{}
exitChan chan int
Expand Down Expand Up @@ -245,6 +247,7 @@ func (n *NSQD) Main() {
http_api.Serve(n.httpListener, httpServer, n.opts.Logger, "HTTP")
})

n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.idPump() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.opts.StatsdAddress != "" {
Expand Down Expand Up @@ -551,6 +554,137 @@ func (n *NSQD) Notify(v interface{}) {
})
}

// channels returns a flat slice of all channels in all topics
func (n *NSQD) channels() []*Channel {
var channels []*Channel
n.RLock()
for _, t := range n.topicMap {
t.RLock()
for _, c := range t.channelMap {
channels = append(channels, c)
}
t.RUnlock()
}
n.RUnlock()
return channels
}

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.opts.QueueScanWorkerPoolMax {
idealPoolSize = n.opts.QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// contract
closeCh <- 1
n.poolSize--
} else {
// expand
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}

// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}

// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered "dirty".
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.opts.QueueScanSelectionCount)
responseCh := make(chan bool, n.opts.QueueScanSelectionCount)
closeCh := make(chan int)

workTicker := time.NewTicker(n.opts.QueueScanInterval)
refreshTicker := time.NewTicker(n.opts.QueueScanRefreshInterval)

channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)

for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}

num := n.opts.QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another note for myself:

this doesn't have to be in the loop block.


loop:
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}

numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

making a note for myself:

This is insufficient in the case where we asynchronously get a signal from n.exitChan to close... there's definitely a case where this would deadlock.

We should probably select on n.exitChan here, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was resolved

numDirty++
}
}

if float64(numDirty)/float64(num) > n.opts.QueueScanDirtyPercent {
goto loop
}
}

exit:
n.logf("QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}

func buildTLSConfig(opts *nsqdOptions) (*tls.Config, error) {
var tlsConfig *tls.Config

Expand Down
Loading