Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: dont retain client pointer for inflight msgs #242

Merged
merged 1 commit into from
Aug 5, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 32 additions & 39 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Channel struct {
exitFlag int32

// state tracking
clients []Consumer
clients map[int64]Consumer
paused int32
ephemeralChannel bool
deleteCallback func(*Channel)
Expand All @@ -74,9 +74,9 @@ type Channel struct {
}

type inFlightMessage struct {
msg *nsq.Message
client Consumer
ts time.Time
msg *nsq.Message
clientID int64
ts time.Time
}

// NewChannel creates a new instance of the Channel type and returns a pointer
Expand All @@ -89,7 +89,7 @@ func NewChannel(topicName string, channelName string, context *Context,
memoryMsgChan: make(chan *nsq.Message, context.nsqd.options.memQueueSize),
clientMsgChan: make(chan *nsq.Message),
exitChan: make(chan int),
clients: make([]Consumer, 0, 5),
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
context: context,
}
Expand Down Expand Up @@ -308,8 +308,8 @@ func (c *Channel) PutMessage(msg *nsq.Message) error {
}

// TouchMessage resets the timeout for an in-flight message
func (c *Channel) TouchMessage(client Consumer, id nsq.MessageID) error {
item, err := c.popInFlightMessage(client, id)
func (c *Channel) TouchMessage(clientID int64, id nsq.MessageID) error {
item, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
Expand All @@ -333,8 +333,8 @@ func (c *Channel) TouchMessage(client Consumer, id nsq.MessageID) error {
}

// FinishMessage successfully discards an in-flight message
func (c *Channel) FinishMessage(client Consumer, id nsq.MessageID) error {
item, err := c.popInFlightMessage(client, id)
func (c *Channel) FinishMessage(clientID int64, id nsq.MessageID) error {
item, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
Expand All @@ -348,9 +348,9 @@ func (c *Channel) FinishMessage(client Consumer, id nsq.MessageID) error {
// `timeoutMs` > 0 - asynchronously wait for the specified timeout
// and requeue a message (aka "deferred requeue")
//
func (c *Channel) RequeueMessage(client Consumer, id nsq.MessageID, timeout time.Duration) error {
func (c *Channel) RequeueMessage(clientID int64, id nsq.MessageID, timeout time.Duration) error {
// remove from inflight first
item, err := c.popInFlightMessage(client, id)
item, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
Expand All @@ -367,46 +367,36 @@ func (c *Channel) RequeueMessage(client Consumer, id nsq.MessageID, timeout time
}

// AddClient adds a client to the Channel's client list
func (c *Channel) AddClient(client Consumer) {
func (c *Channel) AddClient(clientID int64, client Consumer) {
c.Lock()
defer c.Unlock()

found := false
for _, cli := range c.clients {
if cli == client {
found = true
break
}
}

if !found {
c.clients = append(c.clients, client)
_, ok := c.clients[clientID]
if ok {
return
}
c.clients[clientID] = client
}

// RemoveClient removes a client from the Channel's client list
func (c *Channel) RemoveClient(client Consumer) {
func (c *Channel) RemoveClient(clientID int64) {
c.Lock()
defer c.Unlock()

if len(c.clients) != 0 {
finalClients := make([]Consumer, 0, len(c.clients)-1)
for _, cli := range c.clients {
if cli != client {
finalClients = append(finalClients, cli)
}
}
c.clients = finalClients
_, ok := c.clients[clientID]
if !ok {
return
}
delete(c.clients, clientID)

if len(c.clients) == 0 && c.ephemeralChannel == true {
go c.deleter.Do(func() { c.deleteCallback(c) })
}
}

func (c *Channel) StartInFlightTimeout(msg *nsq.Message, client Consumer) error {
func (c *Channel) StartInFlightTimeout(msg *nsq.Message, clientID int64) error {
now := time.Now()
value := &inFlightMessage{msg, client, now}
value := &inFlightMessage{msg, clientID, now}
absTs := now.Add(c.context.nsqd.options.msgTimeout).UnixNano()
item := &pqueue.Item{Value: value, Priority: absTs}
err := c.pushInFlightMessage(item)
Expand Down Expand Up @@ -456,7 +446,7 @@ func (c *Channel) pushInFlightMessage(item *pqueue.Item) error {
}

// popInFlightMessage atomically removes a message from the in-flight dictionary
func (c *Channel) popInFlightMessage(client Consumer, id nsq.MessageID) (*pqueue.Item, error) {
func (c *Channel) popInFlightMessage(clientID int64, id nsq.MessageID) (*pqueue.Item, error) {
c.Lock()
defer c.Unlock()

Expand All @@ -465,8 +455,8 @@ func (c *Channel) popInFlightMessage(client Consumer, id nsq.MessageID) (*pqueue
return nil, errors.New("ID not in flight")
}

if item.Value.(*inFlightMessage).client != client {
return nil, errors.New("client does not own ID")
if item.Value.(*inFlightMessage).clientID != clientID {
return nil, errors.New("client does not own message")
}

delete(c.inFlightMessages, id)
Expand Down Expand Up @@ -605,14 +595,17 @@ func (c *Channel) deferredWorker() {

func (c *Channel) inFlightWorker() {
c.pqWorker(&c.inFlightPQ, &c.inFlightMutex, func(item *pqueue.Item) {
client := item.Value.(*inFlightMessage).client
clientID := item.Value.(*inFlightMessage).clientID
msg := item.Value.(*inFlightMessage).msg
_, err := c.popInFlightMessage(client, msg.Id)
_, err := c.popInFlightMessage(clientID, msg.Id)
if err != nil {
return
}
atomic.AddUint64(&c.timeoutCount, 1)
client.TimedOutMessage()
client, ok := c.clients[clientID]
if ok {
client.TimedOutMessage()
}
c.doRequeue(msg)
})
}
Expand Down
13 changes: 6 additions & 7 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestInFlightWorker(t *testing.T) {

for i := 0; i < 1000; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, NewClientV2(&Context{nsqd}, nil))
channel.StartInFlightTimeout(msg, 0)
}

assert.Equal(t, len(channel.inFlightMessages), 1000)
Expand All @@ -97,16 +97,15 @@ func TestChannelEmpty(t *testing.T) {
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(&Context{nsqd}, nil)

msgs := make([]*nsq.Message, 0, 25)
for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, client)
channel.StartInFlightTimeout(msg, 0)
msgs = append(msgs, msg)
}

channel.RequeueMessage(client, msgs[len(msgs)-1].Id, 100*time.Millisecond)
channel.RequeueMessage(0, msgs[len(msgs)-1].Id, 100*time.Millisecond)
assert.Equal(t, len(channel.inFlightMessages), 24)
assert.Equal(t, len(channel.inFlightPQ), 24)
assert.Equal(t, len(channel.deferredMessages), 1)
Expand All @@ -132,13 +131,13 @@ func TestChannelEmptyConsumer(t *testing.T) {
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(&Context{nsqd}, conn)
client := NewClientV2(0, conn, &Context{nsqd})
client.SetReadyCount(25)
channel.AddClient(client)
channel.AddClient(client.ID, client)

for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, client)
channel.StartInFlightTimeout(msg, 0)
client.SendingMessage()
}

Expand Down
4 changes: 3 additions & 1 deletion nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ClientV2 struct {
net.Conn
sync.Mutex

ID int64
context *Context
tlsConn net.Conn

Expand Down Expand Up @@ -62,13 +63,14 @@ type ClientV2 struct {
HeartbeatUpdateChan chan time.Duration
}

func NewClientV2(context *Context, conn net.Conn) *ClientV2 {
func NewClientV2(id int64, conn net.Conn, context *Context) *ClientV2 {
var identifier string
if conn != nil {
identifier, _, _ = net.SplitHostPort(conn.RemoteAddr().String())
}

c := &ClientV2{
ID: id,
Conn: conn,
context: context,

Expand Down
31 changes: 19 additions & 12 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,27 @@ import (

type NSQd struct {
sync.RWMutex
options *nsqdOptions
workerId int64
topicMap map[string]*Topic

options *nsqdOptions
workerId int64

clientIDSequence int64

topicMap map[string]*Topic

lookupdTCPAddrs util.StringArray
tcpAddr *net.TCPAddr
httpAddr *net.TCPAddr
tcpListener net.Listener
httpListener net.Listener
idChan chan nsq.MessageID
exitChan chan int
waitGroup util.WaitGroupWrapper
lookupPeers []*nsq.LookupPeer
notifyChan chan interface{}
tlsConfig *tls.Config

tcpAddr *net.TCPAddr
httpAddr *net.TCPAddr
tcpListener net.Listener
httpListener net.Listener
tlsConfig *tls.Config

idChan chan nsq.MessageID
notifyChan chan interface{}
exitChan chan int
waitGroup util.WaitGroupWrapper
}

type nsqdOptions struct {
Expand Down
4 changes: 3 additions & 1 deletion nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,16 @@ func TestEphemeralChannel(t *testing.T) {
body := []byte("an_ephemeral_message")
topic := nsqd.GetTopic(topicName)
ephemeralChannel := topic.GetChannel("ch1#ephemeral")
client := NewClientV2(0, nil, &Context{nsqd})
ephemeralChannel.AddClient(client.ID, client)

msg := nsq.NewMessage(<-nsqd.idChan, body)
topic.PutMessage(msg)
msg = <-ephemeralChannel.clientMsgChan
assert.Equal(t, msg.Body, body)

log.Printf("pulling from channel")
ephemeralChannel.RemoveClient(nil)
ephemeralChannel.RemoveClient(client.ID)

time.Sleep(50 * time.Millisecond)

Expand Down
15 changes: 8 additions & 7 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (p *ProtocolV2) IOLoop(conn net.Conn) error {
var line []byte
var zeroTime time.Time

client := NewClientV2(p.context, conn)
clientID := atomic.AddInt64(&p.context.nsqd.clientIDSequence, 1)
client := NewClientV2(clientID, conn, p.context)
go p.messagePump(client)
for {
if client.HeartbeatInterval > 0 {
Expand Down Expand Up @@ -106,7 +107,7 @@ func (p *ProtocolV2) SendMessage(client *ClientV2, msg *nsq.Message, buf *bytes.
return err
}

client.Channel.StartInFlightTimeout(msg, client)
client.Channel.StartInFlightTimeout(msg, client.ID)
client.SendingMessage()

err = p.Send(client, nsq.FrameTypeMessage, buf.Bytes())
Expand Down Expand Up @@ -264,7 +265,7 @@ exit:
client.Heartbeat.Stop()
client.OutputBufferTimeout.Stop()
if subChannel != nil {
subChannel.RemoveClient(client)
subChannel.RemoveClient(client.ID)
}
if err != nil {
log.Printf("PROTOCOL(V2): [%s] messagePump error - %s", client, err.Error())
Expand Down Expand Up @@ -378,7 +379,7 @@ func (p *ProtocolV2) SUB(client *ClientV2, params [][]byte) ([]byte, error) {

topic := p.context.nsqd.GetTopic(topicName)
channel := topic.GetChannel(channelName)
channel.AddClient(client)
channel.AddClient(client.ID, client)

atomic.StoreInt32(&client.State, nsq.StateSubscribed)
client.Channel = channel
Expand Down Expand Up @@ -434,7 +435,7 @@ func (p *ProtocolV2) FIN(client *ClientV2, params [][]byte) ([]byte, error) {
}

id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
err := client.Channel.FinishMessage(client, id)
err := client.Channel.FinishMessage(client.ID, id)
if err != nil {
return nil, nsq.NewClientErr(err, "E_FIN_FAILED",
fmt.Sprintf("FIN %s failed %s", id, err.Error()))
Expand Down Expand Up @@ -468,7 +469,7 @@ func (p *ProtocolV2) REQ(client *ClientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("REQ timeout %d out of range 0-%d", timeoutDuration, maxTimeout))
}

err = client.Channel.RequeueMessage(client, id, timeoutDuration)
err = client.Channel.RequeueMessage(client.ID, id, timeoutDuration)
if err != nil {
return nil, nsq.NewClientErr(err, "E_REQ_FAILED",
fmt.Sprintf("REQ %s failed %s", id, err.Error()))
Expand Down Expand Up @@ -606,7 +607,7 @@ func (p *ProtocolV2) TOUCH(client *ClientV2, params [][]byte) ([]byte, error) {
}

id := *(*nsq.MessageID)(unsafe.Pointer(&params[1][0]))
err := client.Channel.TouchMessage(client, id)
err := client.Channel.TouchMessage(client.ID, id)
if err != nil {
return nil, nsq.NewClientErr(err, "E_TOUCH_FAILED",
fmt.Sprintf("TOUCH %s failed %s", id, err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func BenchmarkProtocolV2Exec(b *testing.B) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
p := &ProtocolV2{}
c := NewClientV2(nil, nil)
c := NewClientV2(0, nil, nil)
params := [][]byte{[]byte("NOP")}
b.StartTimer()

Expand Down