From de57a276b503550feab246dedd24e618bd79d49e Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 6 Jan 2016 15:08:36 -0800 Subject: [PATCH 1/4] nsqd: don't need to RLock channel on every requeue doRequeue is always called inside exitLock now --- nsqd/channel.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 979972e60..1edb12bd3 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -382,7 +382,10 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura c.removeFromInFlightPQ(msg) if timeout == 0 { - return c.doRequeue(msg) + c.exitMutex.RLock() + err := c.doRequeue(msg) + c.exitMutex.RUnlock() + return err } // deferred requeue @@ -442,12 +445,9 @@ func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) erro } // doRequeue performs the low level operations to requeue a message +// +// Callers of this method need to ensure that a simultaneous exit will not occur func (c *Channel) doRequeue(m *Message) error { - c.RLock() - defer c.RUnlock() - if atomic.LoadInt32(&c.exitFlag) == 1 { - return errors.New("exiting") - } err := c.put(m) if err != nil { return err From 8e8976946da66656af55c9c6e2de5e76e588a460 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 6 Jan 2016 15:09:48 -0800 Subject: [PATCH 2/4] nsqd: use queue-specific lock for inflight/deferred --- nsqd/channel.go | 31 ++++++++++++++----------------- nsqd/http_test.go | 4 ++-- nsqd/protocol_v2_test.go | 8 ++++---- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 1edb12bd3..b3b74d838 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -458,31 +458,31 @@ func (c *Channel) doRequeue(m *Message) error { // pushInFlightMessage atomically adds a message to the in-flight dictionary func (c *Channel) pushInFlightMessage(msg *Message) error { - c.Lock() + c.inFlightMutex.Lock() _, ok := c.inFlightMessages[msg.ID] if ok { - c.Unlock() + c.inFlightMutex.Unlock() return errors.New("ID already in flight") } c.inFlightMessages[msg.ID] = msg - c.Unlock() + c.inFlightMutex.Unlock() return nil } // popInFlightMessage atomically removes a message from the in-flight dictionary func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) { - c.Lock() + c.inFlightMutex.Lock() msg, ok := c.inFlightMessages[id] if !ok { - c.Unlock() + c.inFlightMutex.Unlock() return nil, errors.New("ID not in flight") } if msg.clientID != clientID { - c.Unlock() + c.inFlightMutex.Unlock() return nil, errors.New("client does not own message") } delete(c.inFlightMessages, id) - c.Unlock() + c.inFlightMutex.Unlock() return msg, nil } @@ -504,39 +504,36 @@ func (c *Channel) removeFromInFlightPQ(msg *Message) { } func (c *Channel) pushDeferredMessage(item *pqueue.Item) error { - c.Lock() - defer c.Unlock() - + c.deferredMutex.Lock() // TODO: these map lookups are costly id := item.Value.(*Message).ID _, ok := c.deferredMessages[id] if ok { + c.deferredMutex.Unlock() return errors.New("ID already deferred") } c.deferredMessages[id] = item - + c.deferredMutex.Unlock() return nil } func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error) { - c.Lock() - defer c.Unlock() - + c.deferredMutex.Lock() // TODO: these map lookups are costly item, ok := c.deferredMessages[id] if !ok { + c.deferredMutex.Unlock() return nil, errors.New("ID not deferred") } delete(c.deferredMessages, id) - + c.deferredMutex.Unlock() return item, nil } func (c *Channel) addToDeferredPQ(item *pqueue.Item) { c.deferredMutex.Lock() - defer c.deferredMutex.Unlock() - heap.Push(&c.deferredPQ, item) + c.deferredMutex.Unlock() } // messagePump reads messages from either memory or backend and sends diff --git a/nsqd/http_test.go b/nsqd/http_test.go index b82577484..992d701b6 100644 --- a/nsqd/http_test.go +++ b/nsqd/http_test.go @@ -176,9 +176,9 @@ func TestHTTPpubDefer(t *testing.T) { time.Sleep(5 * time.Millisecond) - ch.Lock() + ch.deferredMutex.Lock() numDef := len(ch.deferredMessages) - ch.Unlock() + ch.deferredMutex.Unlock() equal(t, numDef, 1) } diff --git a/nsqd/protocol_v2_test.go b/nsqd/protocol_v2_test.go index bd33f9bbd..fb6d954ad 100644 --- a/nsqd/protocol_v2_test.go +++ b/nsqd/protocol_v2_test.go @@ -580,9 +580,9 @@ func TestDPUB(t *testing.T) { time.Sleep(25 * time.Millisecond) ch := nsqd.GetTopic(topicName).GetChannel("ch") - ch.Lock() + ch.deferredMutex.Lock() numDef := len(ch.deferredMessages) - ch.Unlock() + ch.deferredMutex.Unlock() equal(t, numDef, 1) // duration out of range @@ -1255,9 +1255,9 @@ func TestSampling(t *testing.T) { }() <-doneChan - channel.Lock() + channel.inFlightMutex.Lock() numInFlight := len(channel.inFlightMessages) - channel.Unlock() + channel.inFlightMutex.Unlock() equal(t, numInFlight <= int(float64(num)*float64(sampleRate+slack)/100.0), true) equal(t, numInFlight >= int(float64(num)*float64(sampleRate-slack)/100.0), true) From 70433e07ec3dcb5d03c4d3ccc3fa1161a316d41b Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 6 Jan 2016 15:10:26 -0800 Subject: [PATCH 3/4] nsqd: tighten up stats related locking even more just as we don't need to lock the whole instance for the lifetime of the stats gathering, we don't need to lock the topic for the lifetime of the channel stats gathering --- nsqd/stats.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/nsqd/stats.go b/nsqd/stats.go index daf792d72..40934683b 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -121,19 +121,17 @@ func (n *NSQD) GetStats() []TopicStats { for _, t := range n.topicMap { realTopics = append(realTopics, t) } + n.RUnlock() sort.Sort(TopicsByName{realTopics}) - topics := make([]TopicStats, 0, len(n.topicMap)) - n.RUnlock() for _, t := range realTopics { t.RLock() - realChannels := make([]*Channel, 0, len(t.channelMap)) for _, c := range t.channelMap { realChannels = append(realChannels, c) } + t.RUnlock() sort.Sort(ChannelsByName{realChannels}) - channels := make([]ChannelStats, 0, len(t.channelMap)) for _, c := range realChannels { c.RLock() @@ -141,14 +139,10 @@ func (n *NSQD) GetStats() []TopicStats { for _, client := range c.clients { clients = append(clients, client.Stats()) } - channels = append(channels, NewChannelStats(c, clients)) c.RUnlock() + channels = append(channels, NewChannelStats(c, clients)) } - topics = append(topics, NewTopicStats(t, channels)) - - t.RUnlock() } - return topics } From 48b2872b8d0eba1eaafef97ea054e6ba85b9529a Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Wed, 6 Jan 2016 15:11:25 -0800 Subject: [PATCH 4/4] nsqd: separate client locks into write and meta locks --- nsqd/client_v2.go | 39 ++++++++++++++++++++------------------- nsqd/protocol_v2.go | 18 +++++++++--------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 50f7270ae..3318bb218 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -58,7 +58,8 @@ type clientV2 struct { FinishCount uint64 RequeueCount uint64 - sync.RWMutex + writeLock sync.RWMutex + metaLock sync.RWMutex ID int64 ctx *context @@ -166,11 +167,11 @@ func (c *clientV2) Identify(data identifyDataV2) error { clientID = data.ShortID } - c.Lock() + c.metaLock.Lock() c.ClientID = clientID c.Hostname = hostname c.UserAgent = data.UserAgent - c.Unlock() + c.metaLock.Unlock() err := c.SetHeartbeatInterval(data.HeartbeatInterval) if err != nil { @@ -214,7 +215,7 @@ func (c *clientV2) Identify(data identifyDataV2) error { } func (c *clientV2) Stats() ClientStats { - c.RLock() + c.metaLock.RLock() // TODO: deprecated, remove in 1.0 name := c.ClientID @@ -227,7 +228,7 @@ func (c *clientV2) Stats() ClientStats { identity = c.AuthState.Identity identityURL = c.AuthState.IdentityURL } - c.RUnlock() + c.metaLock.RUnlock() stats := ClientStats{ // TODO: deprecated, remove in 1.0 Name: name, @@ -392,8 +393,8 @@ func (c *clientV2) UnPause() { } func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() switch { case desiredInterval == -1: @@ -426,8 +427,8 @@ func (c *clientV2) SetOutputBufferSize(desiredSize int) error { } if size > 0 { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() c.OutputBufferSize = size err := c.Writer.Flush() if err != nil { @@ -440,8 +441,8 @@ func (c *clientV2) SetOutputBufferSize(desiredSize int) error { } func (c *clientV2) SetOutputBufferTimeout(desiredTimeout int) error { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() switch { case desiredTimeout == -1: @@ -467,8 +468,8 @@ func (c *clientV2) SetSampleRate(sampleRate int32) error { } func (c *clientV2) SetMsgTimeout(msgTimeout int) error { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() switch { case msgTimeout == 0: @@ -484,8 +485,8 @@ func (c *clientV2) SetMsgTimeout(msgTimeout int) error { } func (c *clientV2) UpgradeTLS() error { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig) tlsConn.SetDeadline(time.Now().Add(5 * time.Second)) @@ -504,8 +505,8 @@ func (c *clientV2) UpgradeTLS() error { } func (c *clientV2) UpgradeDeflate(level int) error { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() conn := c.Conn if c.tlsConn != nil { @@ -524,8 +525,8 @@ func (c *clientV2) UpgradeDeflate(level int) error { } func (c *clientV2) UpgradeSnappy() error { - c.Lock() - defer c.Unlock() + c.writeLock.Lock() + defer c.writeLock.Unlock() conn := c.Conn if c.tlsConn != nil { diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 4f34caabe..809d24fce 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -144,7 +144,7 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message, buf *bytes.Buff } func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error { - client.Lock() + client.writeLock.Lock() var zeroTime time.Time if client.HeartbeatInterval > 0 { @@ -155,7 +155,7 @@ func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error _, err := protocol.SendFramedResponse(client.Writer, frameType, data) if err != nil { - client.Unlock() + client.writeLock.Unlock() return err } @@ -163,7 +163,7 @@ func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error err = client.Flush() } - client.Unlock() + client.writeLock.Unlock() return err } @@ -239,9 +239,9 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { clientMsgChan = nil flusherChan = nil // force flush - client.Lock() + client.writeLock.Lock() err = client.Flush() - client.Unlock() + client.writeLock.Unlock() if err != nil { goto exit } @@ -263,9 +263,9 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // if this case wins, we're either starved // or we won the race between other channels... // in either case, force flush - client.Lock() + client.writeLock.Lock() err = client.Flush() - client.Unlock() + client.writeLock.Unlock() if err != nil { goto exit } @@ -898,9 +898,9 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error()) } - client.RLock() + client.writeLock.RLock() msgTimeout := client.MsgTimeout - client.RUnlock() + client.writeLock.RUnlock() err = client.Channel.TouchMessage(client.ID, *id, msgTimeout) if err != nil { return nil, protocol.NewClientErr(err, "E_TOUCH_FAILED",