Skip to content

Commit

Permalink
Merge pull request #615 from mreiferson/lookup_peer_oom_615
Browse files Browse the repository at this point in the history
nsqd: OOM when misconfiguring lookupd addresses
  • Loading branch information
jehiah committed Oct 4, 2015
2 parents ba7c4b4 + 21c9e9e commit 27c4556
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
2 changes: 1 addition & 1 deletion nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (n *NSQD) lookupLoop() {
continue
}
n.logf("LOOKUP(%s): adding peer", host)
lookupPeer := newLookupPeer(host, n.getOpts().Logger,
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.getOpts().Logger,
connectCallback(n, hostname, syncTopicChan))
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
Expand Down
32 changes: 30 additions & 2 deletions nsqd/lookup_peer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package nsqd

import (
"encoding/binary"
"fmt"
"io"
"net"
"time"

Expand All @@ -19,6 +21,7 @@ type lookupPeer struct {
conn net.Conn
state int32
connectCallback func(*lookupPeer)
maxBodySize int64
Info peerInfo
}

Expand All @@ -33,11 +36,12 @@ type peerInfo struct {
// newLookupPeer creates a new lookupPeer instance connecting to the supplied address.
//
// The supplied connectCallback will be called *every* time the instance connects.
func newLookupPeer(addr string, l logger, connectCallback func(*lookupPeer)) *lookupPeer {
func newLookupPeer(addr string, maxBodySize int64, l logger, connectCallback func(*lookupPeer)) *lookupPeer {
return &lookupPeer{
l: l,
addr: addr,
state: stateDisconnected,
maxBodySize: maxBodySize,
connectCallback: connectCallback,
}
}
Expand Down Expand Up @@ -106,10 +110,34 @@ func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
lp.Close()
return nil, err
}
resp, err := nsq.ReadResponse(lp)
resp, err := readResponseBounded(lp, lp.maxBodySize)
if err != nil {
lp.Close()
return nil, err
}
return resp, nil
}

func readResponseBounded(r io.Reader, limit int64) ([]byte, error) {
var msgSize int32

// message size
err := binary.Read(r, binary.BigEndian, &msgSize)
if err != nil {
return nil, err
}

if int64(msgSize) > limit {
return nil, fmt.Errorf("response body size (%d) is greater than limit (%d)",
msgSize, limit)
}

// message binary data
buf := make([]byte, msgSize)
_, err = io.ReadFull(r, buf)
if err != nil {
return nil, err
}

return buf, nil
}

0 comments on commit 27c4556

Please sign in to comment.