From 996be6d66d201625c932301ee8d306e5dc08a79f Mon Sep 17 00:00:00 2001 From: sparklxb Date: Mon, 10 Apr 2017 15:45:41 +0800 Subject: [PATCH] nsqd: introspection of producer connections --- nsqd/client_v2.go | 25 ++++++++++++++++ nsqd/http.go | 69 +++++++++++++++++++++++++++++++++++++++------ nsqd/nsqd.go | 26 +++++++++++++++++ nsqd/protocol_v2.go | 8 ++++++ nsqd/stats.go | 23 +++++++++++++++ 5 files changed, 143 insertions(+), 8 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 68025b9be..dc1582de7 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -55,6 +55,8 @@ type clientV2 struct { FinishCount uint64 RequeueCount uint64 + pubCounts map[string]uint64 + writeLock sync.RWMutex metaLock sync.RWMutex @@ -141,6 +143,8 @@ func newClientV2(id int64, conn net.Conn, ctx *context) *clientV2 { // heartbeats are client configurable but default to 30s HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, + + pubCounts: make(map[string]uint64), } c.lenSlice = c.lenBuf[:] return c @@ -211,6 +215,13 @@ func (c *clientV2) Stats() ClientStats { identity = c.AuthState.Identity identityURL = c.AuthState.IdentityURL } + pubCounts := make([]PubCount, 0, len(c.pubCounts)) + for topic, count := range c.pubCounts { + pubCounts = append(pubCounts, PubCount{ + Topic: topic, + Count: count, + }) + } c.metaLock.RUnlock() stats := ClientStats{ Version: "V2", @@ -232,6 +243,7 @@ func (c *clientV2) Stats() ClientStats { Authed: c.HasAuthorizations(), AuthIdentity: identity, AuthIdentityURL: identityURL, + PubCounts: pubCounts, } if stats.TLS { p := prettyConnectionState{c.tlsConn.ConnectionState()} @@ -243,6 +255,13 @@ func (c *clientV2) Stats() ClientStats { return stats } +func (c *clientV2) IsProducer() bool { + c.metaLock.RLock() + retval := len(c.pubCounts) > 0 + c.metaLock.RUnlock() + return retval +} + // struct to convert from integers to the human readable strings type prettyConnectionState struct { tls.ConnectionState @@ -343,6 +362,12 @@ func (c *clientV2) SendingMessage() { atomic.AddUint64(&c.MessageCount, 1) } +func (c *clientV2) PublishedMessage(topic string, count uint64) { + c.metaLock.Lock() + c.pubCounts[topic] += count + c.metaLock.Unlock() +} + func (c *clientV2) TimedOutMessage() { atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() diff --git a/nsqd/http.go b/nsqd/http.go index 0a9728331..317054b8d 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -481,6 +481,8 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro channelName, _ := reqParams.Get("channel") jsonFormat := formatString == "json" + producerStats := s.ctx.nsqd.GetProducerStats() + stats := s.ctx.nsqd.GetStats(topicName, channelName) health := s.ctx.nsqd.GetHealth() startTime := s.ctx.nsqd.GetStartTime() @@ -503,23 +505,46 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro break } } + + filteredProducerStats := make([]ClientStats, 0) + for _, clientStat := range producerStats { + var found bool + var count uint64 + for _, v := range clientStat.PubCounts { + if v.Topic == topicName { + count = v.Count + found = true + break + } + } + if !found { + continue + } + clientStat.PubCounts = []PubCount{PubCount{ + Topic: topicName, + Count: count, + }} + filteredProducerStats = append(filteredProducerStats, clientStat) + } + producerStats = filteredProducerStats } ms := getMemStats() if !jsonFormat { - return s.printStats(stats, ms, health, startTime, uptime), nil + return s.printStats(stats, producerStats, ms, health, startTime, uptime), nil } return struct { - Version string `json:"version"` - Health string `json:"health"` - StartTime int64 `json:"start_time"` - Topics []TopicStats `json:"topics"` - Memory memStats `json:"memory"` - }{version.Binary, health, startTime.Unix(), stats, ms}, nil + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` + Memory memStats `json:"memory"` + Producers []ClientStats `json:"producers"` + }{version.Binary, health, startTime.Unix(), stats, ms, producerStats}, nil } -func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { +func (s *httpServer) printStats(stats []TopicStats, producerStats []ClientStats, ms memStats, health string, startTime time.Time, uptime time.Duration) []byte { var buf bytes.Buffer w := &buf @@ -599,6 +624,34 @@ func (s *httpServer) printStats(stats []TopicStats, ms memStats, health string, } } } + + if len(producerStats) == 0 { + fmt.Fprintf(w, "\nProducers: None\n") + } else { + fmt.Fprintf(w, "\nProducers:") + for _, client := range producerStats { + connectTime := time.Unix(client.ConnectTime, 0) + // truncate to the second + duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second + var totalPubCount uint64 + for _, v := range client.PubCounts { + totalPubCount += v.Count + } + fmt.Fprintf(w, "\n [%s %-21s] msgs: %-8d connected: %s\n", + client.Version, + client.ClientID, + totalPubCount, + duration, + ) + for _, v := range client.PubCounts { + fmt.Fprintf(w, " [%-15s] msgs: %-8d\n", + v.Topic, + v.Count, + ) + } + } + } + return buf.Bytes() } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index e6e586818..ad9ea6758 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -39,6 +39,11 @@ type errStore struct { err error } +type Client interface { + Stats() ClientStats + IsProducer() bool +} + type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -54,6 +59,9 @@ type NSQD struct { topicMap map[string]*Topic + clientLock sync.RWMutex + clients map[int64]Client + lookupPeers atomic.Value tcpListener net.Listener @@ -84,6 +92,7 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), + clients: make(map[int64]Client), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -215,6 +224,23 @@ func (n *NSQD) GetStartTime() time.Time { return n.startTime } +func (n *NSQD) AddClient(clientID int64, client Client) { + n.clientLock.Lock() + n.clients[clientID] = client + n.clientLock.Unlock() +} + +func (n *NSQD) RemoveClient(clientID int64) { + n.clientLock.Lock() + _, ok := n.clients[clientID] + if !ok { + n.clientLock.Unlock() + return + } + delete(n.clients, clientID) + n.clientLock.Unlock() +} + func (n *NSQD) Main() { var err error ctx := &context{n} diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index a767c5522..92bebcdd9 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -40,6 +40,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) + p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -117,6 +118,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { client.Channel.RemoveClient(client.ID) } + p.ctx.nsqd.RemoveClient(client.ID) return err } @@ -799,6 +801,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } + client.PublishedMessage(topicName, 1) + return okBytes, nil } @@ -850,6 +854,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) } + client.PublishedMessage(topicName, uint64(len(messages))) + return okBytes, nil } @@ -912,6 +918,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) } + client.PublishedMessage(topicName, 1) + return okBytes, nil } diff --git a/nsqd/stats.go b/nsqd/stats.go index 8c3499b0e..881188979 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -71,6 +71,11 @@ func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats { } } +type PubCount struct { + Topic string `json:"topic"` + Count uint64 `json:"count"` +} + type ClientStats struct { ClientID string `json:"client_id"` Hostname string `json:"hostname"` @@ -91,6 +96,8 @@ type ClientStats struct { AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` + PubCounts []PubCount `json:"pub_counts,omitempty"` + TLS bool `json:"tls"` CipherSuite string `json:"tls_cipher_suite"` TLSVersion string `json:"tls_version"` @@ -168,6 +175,22 @@ func (n *NSQD) GetStats(topic string, channel string) []TopicStats { return topics } +func (n *NSQD) GetProducerStats() []ClientStats { + n.clientLock.RLock() + var producers []Client + for _, c := range n.clients { + if c.IsProducer() { + producers = append(producers, c) + } + } + n.clientLock.RUnlock() + producerStats := make([]ClientStats, 0, len(producers)) + for _, p := range producers { + producerStats = append(producerStats, p.Stats()) + } + return producerStats +} + type memStats struct { HeapObjects uint64 `json:"heap_objects"` HeapIdleBytes uint64 `json:"heap_idle_bytes"`