Skip to content

Commit

Permalink
set memoryMsgChan as nil when -mem-queue-size=0
Browse files Browse the repository at this point in the history
  • Loading branch information
bitpeng committed Jun 5, 2019
1 parent 223e97f commit 082679f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
9 changes: 8 additions & 1 deletion nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@ func NewChannel(topicName string, channelName string, ctx *context,
c := &Channel{
topicName: topicName,
name: channelName,
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
ctx: ctx,
}
// Note: generally, we intend to save all the message in the disk-queue
// when set `-mem-queue-size=0` option, but `make(chan *Message, 0)` just
// create a unbuffered chan which can also send/receive message as well.
if ctx.nsqd.getOpts().MemQueueSize > 0 {
c.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
} else {
c.memoryMsgChan = nil
}
if len(ctx.nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
c.e2eProcessingLatencyStream = quantile.New(
ctx.nsqd.getOpts().E2EProcessingLatencyWindowTime,
Expand Down
9 changes: 8 additions & 1 deletion nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
t := &Topic{
name: topicName,
channelMap: make(map[string]*Channel),
memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
Expand All @@ -57,6 +56,14 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
deleteCallback: deleteCallback,
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
// Note: generally, we intend to save all the message in the disk-queue
// when set `-mem-queue-size=0` option, but `make(chan *Message, 0)` just
// create a unbuffered chan which can also send/receive message as well.
if ctx.nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
} else {
t.memoryMsgChan = nil
}

if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
Expand Down

0 comments on commit 082679f

Please sign in to comment.