diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 68025b9be..dc1582de7 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -55,6 +55,8 @@ type clientV2 struct { FinishCount uint64 RequeueCount uint64 + pubCounts map[string]uint64 + writeLock sync.RWMutex metaLock sync.RWMutex @@ -141,6 +143,8 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { // heartbeats are client configurable but default to 30s HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, + + pubCounts: make(map[string]uint64), } c.lenSlice = c.lenBuf[:] return c @@ -211,6 +215,13 @@ func (c *clientV2) Stats() ClientStats { identity = c.AuthState.Identity identityURL = c.AuthState.IdentityURL } + pubCounts := make([]PubCount, 0, len(c.pubCounts)) + for topic, count := range c.pubCounts { + pubCounts = append(pubCounts, PubCount{ + Topic: topic, + Count: count, + }) + } c.metaLock.RUnlock() stats := ClientStats{ Version: "V2", @@ -232,6 +243,7 @@ func (c *clientV2) Stats() ClientStats { Authed: c.HasAuthorizations(), AuthIdentity: identity, AuthIdentityURL: identityURL, + PubCounts: pubCounts, } if stats.TLS { p := prettyConnectionState{c.tlsConn.ConnectionState()} @@ -243,6 +255,13 @@ func (c *clientV2) Stats() ClientStats { return stats } +func (c *clientV2) IsProducer() bool { + c.metaLock.RLock() + retval := len(c.pubCounts) > 0 + c.metaLock.RUnlock() + return retval +} + // struct to convert from integers to the human readable strings type prettyConnectionState struct { tls.ConnectionState @@ -343,6 +362,12 @@ func (c *clientV2) SendingMessage() { atomic.AddUint64(&c.MessageCount, 1) } +func (c *clientV2) PublishedMessage(topic string, count uint64) { + c.metaLock.Lock() + c.pubCounts[topic] += count + c.metaLock.Unlock() +} + func (c *clientV2) TimedOutMessage() { atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() diff --git a/nsqd/http.go b/nsqd/http.go index da067798c..317054b8d 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -481,72 +481,98 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro channelName, _ := reqParams.Get("channel") jsonFormat := formatString == "json" + producerStats := s.ctx.nsqd.GetProducerStats() + stats := s.ctx.nsqd.GetStats(topicName, channelName) health := s.ctx.nsqd.GetHealth() startTime := s.ctx.nsqd.GetStartTime() uptime := time.Since(startTime) - // If we WERE given a topic-name, remove stats for all the other topics: + // filter by topic (if specified) if len(topicName) > 0 { - // Find the desired-topic-index: for _, topicStats := range stats { if topicStats.TopicName == topicName { - // If we WERE given a channel-name, remove stats for all the other channels: + // filter by channel (if specified) if len(channelName) > 0 { - // Find the desired-channel: for _, channelStats := range topicStats.Channels { if channelStats.ChannelName == channelName { topicStats.Channels = []ChannelStats{channelStats} - // We've got the channel we were looking for: break } } } - - // We've got the topic we were looking for: stats = []TopicStats{topicStats} break } } + + filteredProducerStats := make([]ClientStats, 0) + for _, clientStat := range producerStats { + var found bool + var count uint64 + for _, v := range clientStat.PubCounts { + if v.Topic == topicName { + count = v.Count + found = true + break + } + } + if !found { + continue + } + clientStat.PubCounts = []PubCount{PubCount{ + Topic: topicName, + Count: count, + }} + filteredProducerStats = append(filteredProducerStats, clientStat) + } + producerStats = filteredProducerStats } ms := getMemStats() if !jsonFormat { - return s.printStats(stats, ms, health, startTime, uptime), nil + return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil } return struct { - Version string `json:"version"` - Health string `json:"health"` - StartTime int64 `json:"start_time"` - Topics []TopicStats `json:"topics"` - Memory memStats `json:"memory"` - }{version.Binary, health, startTime.Unix(), stats, ms}, nil + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` + Memory memStats `json:"memory"` + Producers []ClientStats `json:"producers"` + }{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, nil } -func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { +func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { var buf bytes.Buffer w := &buf + now := time.Now() - io.WriteString(w, fmt.Sprintf("%s\n", version.String("nsqd"))) - io.WriteString(w, fmt.Sprintf("start_time %v\n", startTime.Format(time.RFC3339))) - io.WriteString(w, fmt.Sprintf("uptime %s\n", uptime)) + + fmt.Fprintf(w, "%s\n", version.String("nsqd")) + fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339)) + fmt.Fprintf(w, "uptime %s\n", uptime) + + fmt.Fprintf(w, "\nHealth: %s\n", health) + + fmt.Fprintf(w, "\nMemory:\n") + fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects) + fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes) + fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes) + fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBytes) + fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100) + fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99) + fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95) + fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes) + fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns) + if len(stats) == 0 { - io.WriteString(w, "\nNO_TOPICS\n") - return buf.Bytes() + fmt.Fprintf(w, "\nTopics: None\n") + } else { + fmt.Fprintf(w, "\nTopics:") } - fmt.Fprintf(w, "\nMemory:\n") - fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects) - fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes) - fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes) - fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBytes) - fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100) - fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99) - fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95) - fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes) - fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns) - - io.WriteString(w, fmt.Sprintf("\nHealth: %s\n", health)) + for _, t := range stats { var pausedPrefix string if t.Paused { @@ -554,36 +580,37 @@ func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, } else { pausedPrefix = " " } - io.WriteString(w, fmt.Sprintf("\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", + fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n", pausedPrefix, t.TopicName, t.Depth, t.BackendDepth, t.MessageCount, - t.E2eProcessingLatency)) + t.E2eProcessingLatency, + ) for _, c := range t.Channels { if c.Paused { pausedPrefix = " *P " } else { pausedPrefix = " " } - io.WriteString(w, - fmt.Sprintf("%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n", - pausedPrefix, - c.ChannelName, - c.Depth, - c.BackendDepth, - c.InFlightCount, - c.DeferredCount, - c.RequeueCount, - c.TimeoutCount, - c.MessageCount, - c.E2eProcessingLatency)) + fmt.Fprintf(w, "%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n", + pausedPrefix, + c.ChannelName, + c.Depth, + c.BackendDepth, + c.InFlightCount, + c.DeferredCount, + c.RequeueCount, + c.TimeoutCount, + c.MessageCount, + c.E2eProcessingLatency, + ) for _, client := range c.Clients { connectTime := time.Unix(client.ConnectTime, 0) // truncate to the second duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second - io.WriteString(w, fmt.Sprintf(" [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n", + fmt.Fprintf(w, " [%s %-21s] state: %d inflt: %-4d rdy: %-4d fin: %-8d re-q: %-8d msgs: %-8d connected: %s\n", client.Version, client.ClientID, client.State, @@ -593,10 +620,38 @@ func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, client.RequeueCount, client.MessageCount, duration, - )) + ) + } + } + } + + if len(producerStats) == 0 { + fmt.Fprintf(w, "\nProducers: None\n") + } else { + fmt.Fprintf(w, "\nProducers:") + for _, client := range producerStats { + connectTime := time.Unix(client.ConnectTime, 0) + // truncate to the second + duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second + var totalPubCount uint64 + for _, v := range client.PubCounts { + totalPubCount += v.Count + } + fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\n", + client.Version, + client.ClientID, + totalPubCount, + duration, + ) + for _, v := range client.PubCounts { + fmt.Fprintf(w, " [%-15s] msgs: %-8d\n", + v.Topic, + v.Count, + ) } } } + return buf.Bytes() } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index e6e586818..ad9ea6758 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -39,6 +39,11 @@ type errStore struct { err error } +type Client interface { + Stats() ClientStats + IsProducer() bool +} + type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -54,6 +59,9 @@ type NSQD struct { topicMap map[string]*Topic + clientLock sync.RWMutex + clients map[int64]Client + lookupPeers atomic.Value tcpListener net.Listener @@ -84,6 +92,7 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), + clients: make(map[int64]Client), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -215,6 +224,23 @@ func (n *NSQD) GetStartTime() time.Time { return n.startTime } +func (n *NSQD) AddClient(clientID int64, client Client) { + n.clientLock.Lock() + n.clients[clientID] = client + n.clientLock.Unlock() +} + +func (n *NSQD) RemoveClient(clientID int64) { + n.clientLock.Lock() + _, ok := n.clients[clientID] + if !ok { + n.clientLock.Unlock() + return + } + delete(n.clients, clientID) + n.clientLock.Unlock() +} + func (n *NSQD) Main() { var err error ctx := &context{n} diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index a767c5522..92bebcdd9 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -40,6 +40,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) + p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -117,6 +118,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { client.Channel.RemoveClient(client.ID) } + p.ctx.nsqd.RemoveClient(client.ID) return err } @@ -799,6 +801,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } + client.PublishedMessage(topicName, 1) + return okBytes, nil } @@ -850,6 +854,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) } + client.PublishedMessage(topicName, uint64(len(messages))) + return okBytes, nil } @@ -912,6 +918,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) } + client.PublishedMessage(topicName, 1) + return okBytes, nil } diff --git a/nsqd/stats.go b/nsqd/stats.go index 8c3499b0e..881188979 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -71,6 +71,11 @@ func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats { } } +type PubCount struct { + Topic string `json:"topic"` + Count uint64 `json:"count"` +} + type ClientStats struct { ClientID string `json:"client_id"` Hostname string `json:"hostname"` @@ -91,6 +96,8 @@ type ClientStats struct { AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` + PubCounts []PubCount `json:"pub_counts,omitempty"` + TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` @@ -168,6 +175,22 @@ func (n *NSQD) GetStats(topic string, channel string) []TopicStats { return topics } +func (n *NSQD) GetProducerStats() []ClientStats { + n.clientLock.RLock() + var producers []Client + for _, c := range n.clients { + if c.IsProducer() { + producers = append(producers, c) + } + } + n.clientLock.RUnlock() + producerStats := make([]ClientStats, 0, len(producers)) + for _, p := range producers { + producerStats = append(producerStats, p.Stats()) + } + return producerStats +} + type memStats struct { HeapObjects uint64 `json:"heap_objects"` HeapIdleBytes uint64 `json:"heap_idle_bytes"`