diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 68025b9be..c3b17ad2a 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -54,6 +54,8 @@ type clientV2 struct { MessageCount uint64 FinishCount uint64 RequeueCount uint64 + // Only for producers + PublishCount uint64 writeLock sync.RWMutex metaLock sync.RWMutex @@ -224,6 +226,7 @@ func (c *clientV2) Stats() ClientStats { MessageCount: atomic.LoadUint64(&c.MessageCount), FinishCount: atomic.LoadUint64(&c.FinishCount), RequeueCount: atomic.LoadUint64(&c.RequeueCount), + PublishCount: atomic.LoadUint64(&c.PublishCount), ConnectTime: c.ConnectTime.Unix(), SampleRate: atomic.LoadInt32(&c.SampleRate), TLS: atomic.LoadInt32(&c.TLS) == 1, @@ -243,6 +246,10 @@ func (c *clientV2) Stats() ClientStats { return stats } +func (c *clientV2) IsProducer() bool { + return atomic.LoadUint64(&c.PublishCount) > 0 +} + // struct to convert from integers to the human readable strings type prettyConnectionState struct { tls.ConnectionState @@ -343,6 +350,10 @@ func (c *clientV2) SendingMessage() { atomic.AddUint64(&c.MessageCount, 1) } +func (c *clientV2) PublishedMessage(msgNum uint64) { + atomic.AddUint64(&c.PublishCount, msgNum) +} + func (c *clientV2) TimedOutMessage() { atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() diff --git a/nsqd/http.go b/nsqd/http.go index da067798c..7afec1d40 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -479,8 +479,15 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro formatString, _ := reqParams.Get("format") topicName, _ := reqParams.Get("topic") channelName, _ := reqParams.Get("channel") + withProducers, _ := reqParams.Get("producers") jsonFormat := formatString == "json" + var producersStats []ClientStats + switch strings.ToLower(withProducers) { + case "1", "yes", "true": + producersStats = s.ctx.nsqd.GetProducerStats() + } + stats := s.ctx.nsqd.GetStats(topicName, channelName) health := s.ctx.nsqd.GetHealth() startTime := s.ctx.nsqd.GetStartTime() @@ -512,19 +519,20 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro ms := getMemStats() if !jsonFormat { - return s.printStats(stats, ms, health, startTime, uptime), nil + return s.printStats(stats, producersStats, ms, health, startTime, uptime), nil } return struct { - Version string `json:"version"` - Health string `json:"health"` - StartTime int64 `json:"start_time"` - Topics []TopicStats `json:"topics"` - Memory memStats `json:"memory"` - }{version.Binary, health, startTime.Unix(), stats, ms}, nil + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` + Memory memStats `json:"memory"` + Producers []ClientStats `json:"producers,omitempty"` + }{version.Binary, health, startTime.Unix(), stats, ms, producersStats}, nil } -func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { +func (s *httpServer) printStats(stats []TopicStats, producersStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { var buf bytes.Buffer w := &buf now := time.Now() @@ -597,6 +605,21 @@ func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, } } } + + if len(producersStats) != 0 { + fmt.Fprintf(w, "\nProducers:\n") + for _, client := range producersStats { + connectTime := time.Unix(client.ConnectTime, 0) + // truncate to the second + duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second + io.WriteString(w, fmt.Sprintf(" [%s %-21s] msgs: %-8d connected: %s\n", + client.Version, + client.ClientID, + client.PublishCount, + duration, + )) + } + } return buf.Bytes() } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 1649d5d25..b52bca570 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -39,6 +39,11 @@ type errStore struct { err error } +type Client interface { + Stats() ClientStats + IsProducer() bool +} + type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -53,6 +58,7 @@ type NSQD struct { startTime time.Time topicMap map[string]*Topic + clients map[int64]Client lookupPeers atomic.Value @@ -84,6 +90,7 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), + clients: make(map[int64]Client), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -212,6 +219,25 @@ func (n *NSQD) GetStartTime() time.Time { return n.startTime } +// AddClient adds a client to the producers map +func (n *NSQD) AddClient(clientID int64, client Client) { + n.Lock() + n.clients[clientID] = client + n.Unlock() +} + +// RemoveClient removes a client from the producers map +func (n *NSQD) RemoveClient(clientID int64) { + n.Lock() + _, ok := n.clients[clientID] + if !ok { + n.Unlock() + return + } + delete(n.clients, clientID) + n.Unlock() +} + func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 48a4c9e3c..55633fe3f 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -40,6 +40,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) + p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -117,6 +118,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { client.Channel.RemoveClient(client.ID) } + p.ctx.nsqd.RemoveClient(client.ID) return err } @@ -800,6 +802,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } + client.PublishedMessage(1) + return okBytes, nil } @@ -851,6 +855,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) } + client.PublishedMessage(uint64(len(messages))) + return okBytes, nil } @@ -913,6 +919,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) } + client.PublishedMessage(1) + return okBytes, nil } diff --git a/nsqd/stats.go b/nsqd/stats.go index a26fb6807..f96e880d1 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -75,6 +75,7 @@ type ClientStats struct { MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` + PublishCount uint64 `json:"publish_count,omitempty"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` @@ -161,6 +162,22 @@ func (n *NSQD) GetStats(topic string, channel string) []TopicStats { return topics } +func (n *NSQD) GetProducerStats() []ClientStats { + n.RLock() + var realProducers []Client + for _, c := range n.clients { + if c.IsProducer() { + realProducers = append(realProducers, c) + } + } + n.RUnlock() + producerStats := make([]ClientStats, 0, len(realProducers)) + for _, p := range realProducers { + producerStats = append(producerStats, p.Stats()) + } + return producerStats +} + type memStats struct { HeapObjects uint64 `json:"heap_objects"` HeapIdleBytes uint64 `json:"heap_idle_bytes"`