From 41ba852c9b18feda8184651a3b737793a9ba2a12 Mon Sep 17 00:00:00 2001 From: sparklxb Date: Mon, 10 Apr 2017 15:45:41 +0800 Subject: [PATCH] nsqd/nsqdmin: introspection of producer connections nsqd/nsqdmin: introspection of producer connections nsqd/nsqdmin: introspection of producer connections nsqd/nsqdmin: introspection of producer connections nsqd/nsqdmin: introspection of producer connections --- nsqd/client_v2.go | 11 +++++++++++ nsqd/http.go | 39 +++++++++++++++++++++++++++++++-------- nsqd/nsqd.go | 26 ++++++++++++++++++++++++++ nsqd/protocol_v2.go | 8 ++++++++ nsqd/stats.go | 17 +++++++++++++++++ 5 files changed, 93 insertions(+), 8 deletions(-) diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index 8d0ff8b46..f30df586c 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -54,6 +54,8 @@ type clientV2 struct { MessageCount uint64 FinishCount uint64 RequeueCount uint64 + // Only for producers + PublishCount uint64 writeLock sync.RWMutex metaLock sync.RWMutex @@ -224,6 +226,7 @@ func (c *clientV2) Stats() ClientStats { MessageCount: atomic.LoadUint64(&c.MessageCount), FinishCount: atomic.LoadUint64(&c.FinishCount), RequeueCount: atomic.LoadUint64(&c.RequeueCount), + PublishCount: atomic.LoadUint64(&c.PublishCount), ConnectTime: c.ConnectTime.Unix(), SampleRate: atomic.LoadInt32(&c.SampleRate), TLS: atomic.LoadInt32(&c.TLS) == 1, @@ -243,6 +246,10 @@ func (c *clientV2) Stats() ClientStats { return stats } +func (c *clientV2) IsProducer() bool { + return atomic.LoadUint64(&c.PublishCount) > 0 +} + // struct to convert from integers to the human readable strings type prettyConnectionState struct { tls.ConnectionState @@ -346,6 +353,10 @@ func (c *clientV2) SendingMessage() { atomic.AddUint64(&c.MessageCount, 1) } +func (c *clientV2) PublishedMessage(msgNum uint64) { + atomic.AddUint64(&c.PublishCount, msgNum) +} + func (c *clientV2) TimedOutMessage() { atomic.AddInt64(&c.InFlightCount, -1) c.tryUpdateReadyState() diff --git a/nsqd/http.go b/nsqd/http.go index 127ed58fb..9e9680ea1 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -464,8 +464,15 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro formatString, _ := reqParams.Get("format") topicName, _ := reqParams.Get("topic") channelName, _ := reqParams.Get("channel") + withProducers, _ := reqParams.Get("producers") jsonFormat := formatString == "json" + var producersStats []ClientStats + switch strings.ToLower(withProducers) { + case "1", "yes", "true": + producersStats = s.ctx.nsqd.GetProducerStats() + } + stats := s.ctx.nsqd.GetStats() health := s.ctx.nsqd.GetHealth() startTime := s.ctx.nsqd.GetStartTime() @@ -497,19 +504,20 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro ms := getMemStats() if !jsonFormat { - return s.printStats(stats, ms, health, startTime, uptime), nil + return s.printStats(stats, producersStats, ms, health, startTime, uptime), nil } return struct { - Version string `json:"version"` - Health string `json:"health"` - StartTime int64 `json:"start_time"` - Topics []TopicStats `json:"topics"` - Memory *memStats `json:"memory"` - }{version.Binary, health, startTime.Unix(), stats, ms}, nil + Version string `json:"version"` + Health string `json:"health"` + StartTime int64 `json:"start_time"` + Topics []TopicStats `json:"topics"` + Memory *memStats `json:"memory"` + Producers []ClientStats `json:"producers,omitempty"` + }{version.Binary, health, startTime.Unix(), stats, ms, producersStats}, nil } -func (s *httpServer) printStats(stats []TopicStats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte { +func (s *httpServer) printStats(stats []TopicStats, producersStats []ClientStats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte { var buf bytes.Buffer w := &buf now := time.Now() @@ -582,6 +590,21 @@ func (s *httpServer) printStats(stats []TopicStats, ms *memStats, health string, } } } + + if len(producersStats) != 0 { + fmt.Fprintf(w, "\nProducers:\n") + for _, client := range producersStats { + connectTime := time.Unix(client.ConnectTime, 0) + // truncate to the second + duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second + io.WriteString(w, fmt.Sprintf(" [%s %-21s] msgs: %-8d connected: %s\n", + client.Version, + client.ClientID, + client.PublishCount, + duration, + )) + } + } return buf.Bytes() } diff --git a/nsqd/nsqd.go b/nsqd/nsqd.go index 02ea507a7..a5844ec8e 100644 --- a/nsqd/nsqd.go +++ b/nsqd/nsqd.go @@ -38,6 +38,11 @@ type errStore struct { err error } +type Client interface { + Stats() ClientStats + IsProducer() bool +} + type NSQD struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 @@ -52,6 +57,7 @@ type NSQD struct { startTime time.Time topicMap map[string]*Topic + clients map[int64]Client lookupPeers atomic.Value @@ -83,6 +89,7 @@ func New(opts *Options) *NSQD { n := &NSQD{ startTime: time.Now(), topicMap: make(map[string]*Topic), + clients: make(map[int64]Client), exitChan: make(chan int), notifyChan: make(chan interface{}), optsNotificationChan: make(chan struct{}, 1), @@ -206,6 +213,25 @@ func (n *NSQD) GetStartTime() time.Time { return n.startTime } +// AddClient adds a client to the producers map +func (n *NSQD) AddClient(clientID int64, client Client) { + n.Lock() + n.clients[clientID] = client + n.Unlock() +} + +// RemoveClient removes a client from the producers map +func (n *NSQD) RemoveClient(clientID int64) { + n.Lock() + _, ok := n.clients[clientID] + if !ok { + n.Unlock() + return + } + delete(n.clients, clientID) + n.Unlock() +} + func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 02ba0dac3..7a65cf967 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -41,6 +41,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) + p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize @@ -120,6 +121,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error { client.Channel.RemoveClient(client.ID) } + p.ctx.nsqd.RemoveClient(client.ID) return err } @@ -796,6 +798,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } + client.PublishedMessage(1) + return okBytes, nil } @@ -847,6 +851,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error()) } + client.PublishedMessage(uint64(len(messages))) + return okBytes, nil } @@ -909,6 +915,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) } + client.PublishedMessage(1) + return okBytes, nil } diff --git a/nsqd/stats.go b/nsqd/stats.go index 6f732fa82..e8ad66676 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -75,6 +75,7 @@ type ClientStats struct { MessageCount uint64 `json:"message_count"` FinishCount uint64 `json:"finish_count"` RequeueCount uint64 `json:"requeue_count"` + PublishCount uint64 `json:"publish_count,omitempty"` ConnectTime int64 `json:"connect_ts"` SampleRate int32 `json:"sample_rate"` Deflate bool `json:"deflate"` @@ -145,6 +146,22 @@ func (n *NSQD) GetStats() []TopicStats { return topics } +func (n *NSQD) GetProducerStats() []ClientStats { + n.RLock() + var realProducers []Client + for _, c := range n.clients { + if c.IsProducer() { + realProducers = append(realProducers, c) + } + } + n.RUnlock() + producerStats := make([]ClientStats, 0, len(realProducers)) + for _, p := range realProducers { + producerStats = append(producerStats, p.Stats()) + } + return producerStats +} + type memStats struct { HeapObjects uint64 `json:"heap_objects"` HeapIdleBytes uint64 `json:"heap_idle_bytes"`