Skip to content

Commit

Permalink
Merge pull request #701 from mreiferson/lock_contention_701
Browse files Browse the repository at this point in the history
nsqd: reduce client/channel lock contention
  • Loading branch information
jehiah committed Jan 7, 2016
2 parents 0bec55b + 48b2872 commit bc0a6b9
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 66 deletions.
43 changes: 20 additions & 23 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Dura
c.removeFromInFlightPQ(msg)

if timeout == 0 {
return c.doRequeue(msg)
c.exitMutex.RLock()
err := c.doRequeue(msg)
c.exitMutex.RUnlock()
return err
}

// deferred requeue
Expand Down Expand Up @@ -442,12 +445,9 @@ func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) erro
}

// doRequeue performs the low level operations to requeue a message
//
// Callers of this method need to ensure that a simultaneous exit will not occur
func (c *Channel) doRequeue(m *Message) error {
c.RLock()
defer c.RUnlock()
if atomic.LoadInt32(&c.exitFlag) == 1 {
return errors.New("exiting")
}
err := c.put(m)
if err != nil {
return err
Expand All @@ -458,31 +458,31 @@ func (c *Channel) doRequeue(m *Message) error {

// pushInFlightMessage atomically adds a message to the in-flight dictionary
func (c *Channel) pushInFlightMessage(msg *Message) error {
c.Lock()
c.inFlightMutex.Lock()
_, ok := c.inFlightMessages[msg.ID]
if ok {
c.Unlock()
c.inFlightMutex.Unlock()
return errors.New("ID already in flight")
}
c.inFlightMessages[msg.ID] = msg
c.Unlock()
c.inFlightMutex.Unlock()
return nil
}

// popInFlightMessage atomically removes a message from the in-flight dictionary
func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) {
c.Lock()
c.inFlightMutex.Lock()
msg, ok := c.inFlightMessages[id]
if !ok {
c.Unlock()
c.inFlightMutex.Unlock()
return nil, errors.New("ID not in flight")
}
if msg.clientID != clientID {
c.Unlock()
c.inFlightMutex.Unlock()
return nil, errors.New("client does not own message")
}
delete(c.inFlightMessages, id)
c.Unlock()
c.inFlightMutex.Unlock()
return msg, nil
}

Expand All @@ -504,39 +504,36 @@ func (c *Channel) removeFromInFlightPQ(msg *Message) {
}

func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
c.Lock()
defer c.Unlock()

c.deferredMutex.Lock()
// TODO: these map lookups are costly
id := item.Value.(*Message).ID
_, ok := c.deferredMessages[id]
if ok {
c.deferredMutex.Unlock()
return errors.New("ID already deferred")
}
c.deferredMessages[id] = item

c.deferredMutex.Unlock()
return nil
}

func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error) {
c.Lock()
defer c.Unlock()

c.deferredMutex.Lock()
// TODO: these map lookups are costly
item, ok := c.deferredMessages[id]
if !ok {
c.deferredMutex.Unlock()
return nil, errors.New("ID not deferred")
}
delete(c.deferredMessages, id)

c.deferredMutex.Unlock()
return item, nil
}

func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
c.deferredMutex.Lock()
defer c.deferredMutex.Unlock()

heap.Push(&c.deferredPQ, item)
c.deferredMutex.Unlock()
}

// messagePump reads messages from either memory or backend and sends
Expand Down
39 changes: 20 additions & 19 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type clientV2 struct {
FinishCount uint64
RequeueCount uint64

sync.RWMutex
writeLock sync.RWMutex
metaLock sync.RWMutex

ID int64
ctx *context
Expand Down Expand Up @@ -166,11 +167,11 @@ func (c *clientV2) Identify(data identifyDataV2) error {
clientID = data.ShortID
}

c.Lock()
c.metaLock.Lock()
c.ClientID = clientID
c.Hostname = hostname
c.UserAgent = data.UserAgent
c.Unlock()
c.metaLock.Unlock()

err := c.SetHeartbeatInterval(data.HeartbeatInterval)
if err != nil {
Expand Down Expand Up @@ -214,7 +215,7 @@ func (c *clientV2) Identify(data identifyDataV2) error {
}

func (c *clientV2) Stats() ClientStats {
c.RLock()
c.metaLock.RLock()
// TODO: deprecated, remove in 1.0
name := c.ClientID

Expand All @@ -227,7 +228,7 @@ func (c *clientV2) Stats() ClientStats {
identity = c.AuthState.Identity
identityURL = c.AuthState.IdentityURL
}
c.RUnlock()
c.metaLock.RUnlock()
stats := ClientStats{
// TODO: deprecated, remove in 1.0
Name: name,
Expand Down Expand Up @@ -392,8 +393,8 @@ func (c *clientV2) UnPause() {
}

func (c *clientV2) SetHeartbeatInterval(desiredInterval int) error {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()

switch {
case desiredInterval == -1:
Expand Down Expand Up @@ -426,8 +427,8 @@ func (c *clientV2) SetOutputBufferSize(desiredSize int) error {
}

if size > 0 {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()
c.OutputBufferSize = size
err := c.Writer.Flush()
if err != nil {
Expand All @@ -440,8 +441,8 @@ func (c *clientV2) SetOutputBufferSize(desiredSize int) error {
}

func (c *clientV2) SetOutputBufferTimeout(desiredTimeout int) error {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()

switch {
case desiredTimeout == -1:
Expand All @@ -467,8 +468,8 @@ func (c *clientV2) SetSampleRate(sampleRate int32) error {
}

func (c *clientV2) SetMsgTimeout(msgTimeout int) error {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()

switch {
case msgTimeout == 0:
Expand All @@ -484,8 +485,8 @@ func (c *clientV2) SetMsgTimeout(msgTimeout int) error {
}

func (c *clientV2) UpgradeTLS() error {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()

tlsConn := tls.Server(c.Conn, c.ctx.nsqd.tlsConfig)
tlsConn.SetDeadline(time.Now().Add(5 * time.Second))
Expand All @@ -504,8 +505,8 @@ func (c *clientV2) UpgradeTLS() error {
}

func (c *clientV2) UpgradeDeflate(level int) error {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()

conn := c.Conn
if c.tlsConn != nil {
Expand All @@ -524,8 +525,8 @@ func (c *clientV2) UpgradeDeflate(level int) error {
}

func (c *clientV2) UpgradeSnappy() error {
c.Lock()
defer c.Unlock()
c.writeLock.Lock()
defer c.writeLock.Unlock()

conn := c.Conn
if c.tlsConn != nil {
Expand Down
4 changes: 2 additions & 2 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ func TestHTTPpubDefer(t *testing.T) {

time.Sleep(5 * time.Millisecond)

ch.Lock()
ch.deferredMutex.Lock()
numDef := len(ch.deferredMessages)
ch.Unlock()
ch.deferredMutex.Unlock()
equal(t, numDef, 1)
}

Expand Down
18 changes: 9 additions & 9 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message, buf *bytes.Buff
}

func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
client.Lock()
client.writeLock.Lock()

var zeroTime time.Time
if client.HeartbeatInterval > 0 {
Expand All @@ -155,15 +155,15 @@ func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error

_, err := protocol.SendFramedResponse(client.Writer, frameType, data)
if err != nil {
client.Unlock()
client.writeLock.Unlock()
return err
}

if frameType != frameTypeMessage {
err = client.Flush()
}

client.Unlock()
client.writeLock.Unlock()

return err
}
Expand Down Expand Up @@ -239,9 +239,9 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
clientMsgChan = nil
flusherChan = nil
// force flush
client.Lock()
client.writeLock.Lock()
err = client.Flush()
client.Unlock()
client.writeLock.Unlock()
if err != nil {
goto exit
}
Expand All @@ -263,9 +263,9 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.Lock()
client.writeLock.Lock()
err = client.Flush()
client.Unlock()
client.writeLock.Unlock()
if err != nil {
goto exit
}
Expand Down Expand Up @@ -898,9 +898,9 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())
}

client.RLock()
client.writeLock.RLock()
msgTimeout := client.MsgTimeout
client.RUnlock()
client.writeLock.RUnlock()
err = client.Channel.TouchMessage(client.ID, *id, msgTimeout)
if err != nil {
return nil, protocol.NewClientErr(err, "E_TOUCH_FAILED",
Expand Down
8 changes: 4 additions & 4 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,9 @@ func TestDPUB(t *testing.T) {
time.Sleep(25 * time.Millisecond)

ch := nsqd.GetTopic(topicName).GetChannel("ch")
ch.Lock()
ch.deferredMutex.Lock()
numDef := len(ch.deferredMessages)
ch.Unlock()
ch.deferredMutex.Unlock()
equal(t, numDef, 1)

// duration out of range
Expand Down Expand Up @@ -1255,9 +1255,9 @@ func TestSampling(t *testing.T) {
}()
<-doneChan

channel.Lock()
channel.inFlightMutex.Lock()
numInFlight := len(channel.inFlightMessages)
channel.Unlock()
channel.inFlightMutex.Unlock()

equal(t, numInFlight <= int(float64(num)*float64(sampleRate+slack)/100.0), true)
equal(t, numInFlight >= int(float64(num)*float64(sampleRate-slack)/100.0), true)
Expand Down
12 changes: 3 additions & 9 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,34 +121,28 @@ func (n *NSQD) GetStats() []TopicStats {
for _, t := range n.topicMap {
realTopics = append(realTopics, t)
}
n.RUnlock()
sort.Sort(TopicsByName{realTopics})

topics := make([]TopicStats, 0, len(n.topicMap))

This comment has been minimized.

Copy link
@zachbadgett

zachbadgett Jan 7, 2016

Contributor

This will cause a data race; len calls on maps are not thread safe.

This comment has been minimized.

Copy link
@mreiferson

mreiferson Jan 7, 2016

Member

ooo, good catch, this should be realTopics

n.RUnlock()
for _, t := range realTopics {
t.RLock()

realChannels := make([]*Channel, 0, len(t.channelMap))
for _, c := range t.channelMap {
realChannels = append(realChannels, c)
}
t.RUnlock()
sort.Sort(ChannelsByName{realChannels})

channels := make([]ChannelStats, 0, len(t.channelMap))

This comment has been minimized.

Copy link
@mreiferson

mreiferson Jan 7, 2016

Member

and this should be realChannels

for _, c := range realChannels {
c.RLock()
clients := make([]ClientStats, 0, len(c.clients))
for _, client := range c.clients {
clients = append(clients, client.Stats())
}
channels = append(channels, NewChannelStats(c, clients))
c.RUnlock()
channels = append(channels, NewChannelStats(c, clients))
}

topics = append(topics, NewTopicStats(t, channels))

t.RUnlock()
}

return topics
}

0 comments on commit bc0a6b9

Please sign in to comment.