Skip to content

Commit

Permalink
Merge pull request #142 from dustismo/broadcast_address_flag
Browse files Browse the repository at this point in the history
nsqd/nsqlookupd: add --broadcast-address flag
  • Loading branch information
mreiferson committed Feb 6, 2013
2 parents a054ba5 + 3c9367a commit 6b0285d
Show file tree
Hide file tree
Showing 17 changed files with 173 additions and 96 deletions.
19 changes: 16 additions & 3 deletions nsq/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ func TestNsqdToLookupd(t *testing.T) {

producer := producers[0]
producerData, _ := producer.(map[string]interface{})
address := producerData["address"].(string)
address := producerData["address"].(string) //TODO: remove for 1.0
producerHostname := producerData["hostname"].(string)
broadcastAddress := producerData["broadcast_address"].(string)
port := int(producerData["tcp_port"].(float64))
tombstoned := producerData["tombstoned"].(bool)
assert.Equal(t, address, hostname)
assert.Equal(t, producerHostname, hostname)
assert.Equal(t, broadcastAddress, hostname)
assert.Equal(t, port, 4150)
assert.Equal(t, tombstoned, false)

Expand All @@ -61,10 +65,15 @@ func TestNsqdToLookupd(t *testing.T) {

producer = producers[0]
producerData, _ = producer.(map[string]interface{})
address = producerData["address"].(string)
address = producerData["address"].(string) //TODO: remove for 1.0
producerHostname = producerData["hostname"].(string)
broadcastAddress = producerData["broadcast_address"].(string)

port = int(producerData["tcp_port"].(float64))
tombstoned = producerData["tombstoned"].(bool)
assert.Equal(t, address, hostname)
assert.Equal(t, producerHostname, hostname)
assert.Equal(t, broadcastAddress, hostname)
assert.Equal(t, port, 4150)
assert.Equal(t, tombstoned, false)

Expand All @@ -78,9 +87,13 @@ func TestNsqdToLookupd(t *testing.T) {

producer = producers[0]
producerData, _ = producer.(map[string]interface{})
address = producerData["address"].(string)
address = producerData["address"].(string) //TODO: remove for 1.0
producerHostname = producerData["hostname"].(string)
broadcastAddress = producerData["broadcast_address"].(string)
port = int(producerData["tcp_port"].(float64))
assert.Equal(t, address, hostname)
assert.Equal(t, producerHostname, hostname)
assert.Equal(t, broadcastAddress, hostname)
assert.Equal(t, port, 4150)

channels, _ := data.Get("channels").Array()
Expand Down
9 changes: 5 additions & 4 deletions nsq/lookup_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ type LookupPeer struct {

// PeerInfo contains metadata for a LookupPeer instance (and is JSON marshalable)
type PeerInfo struct {
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Address string `json:"address"`
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Address string `json:"address"` //TODO: remove for 1.0
BroadcastAddress string `json:"broadcast_address"`
}

// NewLookupPeer creates a new LookupPeer instance connecting to the supplied address.
Expand Down
25 changes: 15 additions & 10 deletions nsqadmin/lookupd_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ func getLookupdProducers(lookupdHTTPAddrs []string) ([]*Producer, error) {
producersArray, _ := producers.Array()
for i, _ := range producersArray {
producer := producers.GetIndex(i)
address := producer.Get("address").MustString()
address := producer.Get("address").MustString() //TODO: remove for 1.0
hostname := producer.Get("hostname").MustString()
broadcastAddress := producer.Get("broadcast_address").MustString()

httpPort := producer.Get("http_port").MustInt()
tcpPort := producer.Get("tcp_port").MustInt()
key := fmt.Sprintf("%s:%d:%d", address, httpPort, tcpPort)
key := fmt.Sprintf("%s:%d:%d", broadcastAddress, httpPort, tcpPort)
_, ok := allProducers[key]
if !ok {
topicList, _ := producer.Get("topics").Array()
Expand All @@ -130,12 +133,14 @@ func getLookupdProducers(lookupdHTTPAddrs []string) ([]*Producer, error) {
maxVersion = versionObj
}
p := &Producer{
Address: address,
TcpPort: tcpPort,
HttpPort: httpPort,
Version: version,
VersionObj: versionObj,
Topics: topics,
Address: address, //TODO: remove for 1.0
Hostname: hostname,
BroadcastAddress: broadcastAddress,
TcpPort: tcpPort,
HttpPort: httpPort,
Version: version,
VersionObj: versionObj,
Topics: topics,
}
allProducers[key] = p
output = append(output, p)
Expand Down Expand Up @@ -181,9 +186,9 @@ func getLookupdTopicProducers(topic string, lookupdHTTPAddrs []string) ([]string
producers, _ := data.Get("producers").Array()
for _, producer := range producers {
producer := producer.(map[string]interface{})
address := producer["address"].(string)
broadcastAddress := producer["broadcast_address"].(string)
port := int(producer["http_port"].(float64))
key := fmt.Sprintf("%s:%d", address, port)
key := fmt.Sprintf("%s:%d", broadcastAddress, port)
allSources = util.StringAdd(allSources, key)
}
}(endpoint)
Expand Down
20 changes: 11 additions & 9 deletions nsqadmin/statsinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ func TopicsForStrings(s []string) Topics {
}

type Producer struct {
Address string `json:"address"`
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
VersionObj *semver.Version `json:-`
Topics []string `json:"topics"`
OutOfDate bool
Address string `json:"address"` //TODO: remove for 1.0
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
VersionObj *semver.Version `json:-`
Topics []string `json:"topics"`
OutOfDate bool
}

type TopicHostStats struct {
Expand Down Expand Up @@ -112,7 +114,7 @@ func (c TopicHostStatsByHost) Less(i, j int) bool {
return c.TopicHostStatsList[i].HostAddress < c.TopicHostStatsList[j].HostAddress
}
func (c ProducersByHost) Less(i, j int) bool {
return c.ProducerList[i].Address < c.ProducerList[j].Address
return c.ProducerList[i].BroadcastAddress < c.ProducerList[j].BroadcastAddress
}

func (c *ChannelStats) AddHostStats(a *ChannelStats) {
Expand Down Expand Up @@ -145,5 +147,5 @@ func (t *TopicHostStats) AddHostStats(a *TopicHostStats) {
}

func (p *Producer) HTTPAddress() string {
return fmt.Sprintf("%s:%d", p.Address, p.HttpPort)
return fmt.Sprintf("%s:%d", p.BroadcastAddress, p.HttpPort)
}
6 changes: 4 additions & 2 deletions nsqadmin/templates/nodes.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ <h2>NSQD Hosts ({{.Producers | len}})</h2>
<div class="row-fluid"><div class="span12">
<table class="table table-bordered">
<tr>
<th>Host</th>
<th>Hostname</th>
<th>Broadcast Address</th>
<th>TCP Port</th>
<th>HTTP Port</th>
<th>Version</th>
<th>Topics</th>
</tr>
{{range .Producers }}
<tr {{if .OutOfDate}} class="warning"{{end}} >
<td>{{.Address}}</td>
<td>{{.Hostname}}</td>
<td>{{.BroadcastAddress}}
<td>{{.TcpPort}}</td>
<td>{{.HttpPort}}</td>
<td>{{.Version}}</td>
Expand Down
3 changes: 2 additions & 1 deletion nsqd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ It listens on two TCP ports, one for clients and another for the HTTP API.
-verbose=false: enable verbose logging
-version=false: print version string
-worker-id=0: unique identifier (int) for this worker (will default to a hash of hostname)

-broadcast-address: the address for this worker. this is registered with nsqlookupd (defaults to OS hostname)

### Statsd / Graphite Integration

When using `--statsd-address` specify the UDP `<addr>:<port>` for
Expand Down
15 changes: 12 additions & 3 deletions nsqd/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ func (n *NSQd) lookupLoop() {
ci["version"] = util.BINARY_VERSION
ci["tcp_port"] = n.tcpAddr.Port
ci["http_port"] = n.httpAddr.Port
ci["address"] = hostname
ci["address"] = hostname //TODO: drop for 1.0
ci["hostname"] = hostname
ci["broadcast_address"] = n.options.broadcastAddress

cmd, err := nsq.Identify(ci)
if err != nil {
lp.Close()
Expand Down Expand Up @@ -138,10 +141,16 @@ exit:
func (n *NSQd) lookupHttpAddrs() []string {
var lookupHttpAddrs []string
for _, lp := range n.lookupPeers {
if len(lp.Info.Address) <= 0 {

//TODO: remove for 1.0
if len(lp.Info.BroadcastAddress) <= 0 {
lp.Info.BroadcastAddress = lp.Info.Address
}

if len(lp.Info.BroadcastAddress) <= 0 {
continue
}
addr := net.JoinHostPort(lp.Info.Address, strconv.Itoa(lp.Info.HttpPort))
addr := net.JoinHostPort(lp.Info.BroadcastAddress, strconv.Itoa(lp.Info.HttpPort))
lookupHttpAddrs = append(lookupHttpAddrs, addr)
}
return lookupHttpAddrs
Expand Down
38 changes: 22 additions & 16 deletions nsqd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ import (
)

var (
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
verbose = flag.Bool("verbose", false, "enable verbose logging")
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
lookupdTCPAddrs = util.StringArray{}
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http-address", "0.0.0.0:4151", "<addr>:<port> to listen on for HTTP clients")
tcpAddress = flag.String("tcp-address", "0.0.0.0:4150", "<addr>:<port> to listen on for TCP clients")
memQueueSize = flag.Int64("mem-queue-size", 10000, "number of messages to keep in memory (per topic/channel)")
maxBytesPerFile = flag.Int64("max-bytes-per-file", 104857600, "number of bytes per diskqueue file before rolling")
syncEvery = flag.Int64("sync-every", 2500, "number of messages between diskqueue syncs")
msgTimeout = flag.String("msg-timeout", "60s", "duration to wait before auto-requeing a message")
maxMessageSize = flag.Int64("max-message-size", 1024768, "maximum size of a single message in bytes")
maxBodySize = flag.Int64("max-body-size", 5*1024768, "maximum size of a single command body")
maxMsgTimeout = flag.Duration("max-msg-timeout", 15*time.Minute, "maximum duration before a message will timeout")
dataPath = flag.String("data-path", "", "path to store disk-backed messages")
workerId = flag.Int64("worker-id", 0, "unique identifier (int) for this worker (will default to a hash of hostname)")
verbose = flag.Bool("verbose", false, "enable verbose logging")
statsdAddress = flag.String("statsd-address", "", "UDP <addr>:<port> of a statsd daemon for writing stats")
statsdInterval = flag.Int("statsd-interval", 30, "seconds between pushing to statsd")
broadcastAddress = flag.String("broadcast-address", "", "address that will be registered with lookupd, (default to the OS hostname)")
lookupdTCPAddrs = util.StringArray{}
)

func init() {
Expand Down Expand Up @@ -74,6 +75,10 @@ func main() {
log.Fatal(err)
}

if *broadcastAddress == "" {
*broadcastAddress = hostname
}

log.Printf("nsqd v%s", util.BINARY_VERSION)
log.Printf("worker id %d", *workerId)

Expand Down Expand Up @@ -116,6 +121,7 @@ func main() {
options.syncEvery = *syncEvery
options.msgTimeout = msgTimeoutDuration
options.maxMsgTimeout = *maxMsgTimeout
options.broadcastAddress = *broadcastAddress

nsqd = NewNSQd(*workerId, options)
nsqd.tcpAddr = tcpAddr
Expand Down
38 changes: 20 additions & 18 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,30 @@ type NSQd struct {
}

type nsqdOptions struct {
memQueueSize int64
dataPath string
maxMessageSize int64
maxBodySize int64
maxBytesPerFile int64
syncEvery int64
msgTimeout time.Duration
maxMsgTimeout time.Duration
clientTimeout time.Duration
memQueueSize int64
dataPath string
maxMessageSize int64
maxBodySize int64
maxBytesPerFile int64
syncEvery int64
msgTimeout time.Duration
maxMsgTimeout time.Duration
clientTimeout time.Duration
broadcastAddress string
}

func NewNsqdOptions() *nsqdOptions {
return &nsqdOptions{
memQueueSize: 10000,
dataPath: os.TempDir(),
maxMessageSize: 1024768,
maxBodySize: 5 * 1024768,
maxBytesPerFile: 104857600,
syncEvery: 2500,
msgTimeout: 60 * time.Second,
maxMsgTimeout: 15 * time.Minute,
clientTimeout: nsq.DefaultClientTimeout,
memQueueSize: 10000,
dataPath: os.TempDir(),
maxMessageSize: 1024768,
maxBodySize: 5 * 1024768,
maxBytesPerFile: 104857600,
syncEvery: 2500,
msgTimeout: 60 * time.Second,
maxMsgTimeout: 15 * time.Minute,
clientTimeout: nsq.DefaultClientTimeout,
broadcastAddress: "",
}
}

Expand Down
1 change: 1 addition & 0 deletions nsqlookupd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ interface for clients to perform discovery and administrative actions.
-http-address="0.0.0.0:4161": <addr>:<port> to listen on for HTTP clients
-inactive-producer-timeout=5m0s: duration of time a producer will remain in the active list since its last ping
-tcp-address="0.0.0.0:4160": <addr>:<port> to listen on for TCP clients
-broadcast-address: external address of this lookupd node, (default to the OS hostname)
-tombstone-lifetime=45s: duration of time a producer will remain tombstoned if registration remains
-verbose=false: enable verbose logging
-version=false: print version string
Expand Down
30 changes: 18 additions & 12 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func tombstoneTopicProducerHandler(w http.ResponseWriter, req *http.Request) {
log.Printf("DB: setting tombstone for producer@%s of topic(%s)", node, topicName)
producers := lookupd.DB.FindProducers("topic", topicName, "")
for _, p := range producers {
thisNode := fmt.Sprintf("%s:%d", p.Address, p.HttpPort)
thisNode := fmt.Sprintf("%s:%d", p.BroadcastAddress, p.HttpPort)
if thisNode == node {
p.Tombstone()
}
Expand Down Expand Up @@ -238,23 +238,27 @@ func deleteChannelHandler(w http.ResponseWriter, req *http.Request) {

// note: we can't embed the *Producer here because embeded objects are ignored for json marshalling
type producerTopic struct {
Address string `json:"address"`
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Topics []string `json:"topics"`
Address string `json:"address"` //TODO: drop for 1.0
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TcpPort int `json:"tcp_port"`
HttpPort int `json:"http_port"`
Version string `json:"version"`
Topics []string `json:"topics"`
}

func nodesHandler(w http.ResponseWriter, req *http.Request) {
producers := lookupd.DB.FindProducers("client", "", "")
producerTopics := make([]*producerTopic, len(producers))
for i, p := range producers {
producerTopics[i] = &producerTopic{
Address: p.Address,
TcpPort: p.TcpPort,
HttpPort: p.HttpPort,
Version: p.Version,
Topics: lookupd.DB.LookupRegistrations(p).Filter("topic", "*", "").Keys(),
Address: p.Address, //TODO: drop for 1.0
Hostname: p.Hostname,
BroadcastAddress: p.BroadcastAddress,
TcpPort: p.TcpPort,
HttpPort: p.HttpPort,
Version: p.Version,
Topics: lookupd.DB.LookupRegistrations(p).Filter("topic", "*", "").Keys(),
}
}

Expand Down Expand Up @@ -282,7 +286,9 @@ func debugHandler(w http.ResponseWriter, req *http.Request) {
for _, p := range producers {
m := make(map[string]interface{})
m["producer_id"] = p.producerId
m["address"] = p.Address
m["address"] = p.Address //TODO: remove for 1.0
m["hostname"] = p.Hostname
m["broadcast_address"] = p.BroadcastAddress
m["tcp_port"] = p.TcpPort
m["http_port"] = p.HttpPort
m["version"] = p.Version
Expand Down
Loading

0 comments on commit 6b0285d

Please sign in to comment.