Skip to content

Commit

Permalink
nsqd/nsqdmin: introspection of producer connections
Browse files Browse the repository at this point in the history
nsqd/nsqdmin: introspection of producer connections

nsqd/nsqdmin: introspection of producer connections

nsqd/nsqdmin: introspection of producer connections

nsqd/nsqdmin: introspection of producer connections
  • Loading branch information
sparklxb committed Apr 16, 2017
1 parent ad70e25 commit 41ba852
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 8 deletions.
11 changes: 11 additions & 0 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type clientV2 struct {
MessageCount uint64
FinishCount uint64
RequeueCount uint64
// Only for producers
PublishCount uint64

writeLock sync.RWMutex
metaLock sync.RWMutex
Expand Down Expand Up @@ -224,6 +226,7 @@ func (c *clientV2) Stats() ClientStats {
MessageCount: atomic.LoadUint64(&c.MessageCount),
FinishCount: atomic.LoadUint64(&c.FinishCount),
RequeueCount: atomic.LoadUint64(&c.RequeueCount),
PublishCount: atomic.LoadUint64(&c.PublishCount),
ConnectTime: c.ConnectTime.Unix(),
SampleRate: atomic.LoadInt32(&c.SampleRate),
TLS: atomic.LoadInt32(&c.TLS) == 1,
Expand All @@ -243,6 +246,10 @@ func (c *clientV2) Stats() ClientStats {
return stats
}

func (c *clientV2) IsProducer() bool {
return atomic.LoadUint64(&c.PublishCount) > 0
}

// struct to convert from integers to the human readable strings
type prettyConnectionState struct {
tls.ConnectionState
Expand Down Expand Up @@ -346,6 +353,10 @@ func (c *clientV2) SendingMessage() {
atomic.AddUint64(&c.MessageCount, 1)
}

func (c *clientV2) PublishedMessage(msgNum uint64) {
atomic.AddUint64(&c.PublishCount, msgNum)
}

func (c *clientV2) TimedOutMessage() {
atomic.AddInt64(&c.InFlightCount, -1)
c.tryUpdateReadyState()
Expand Down
39 changes: 31 additions & 8 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,15 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro
formatString, _ := reqParams.Get("format")
topicName, _ := reqParams.Get("topic")
channelName, _ := reqParams.Get("channel")
withProducers, _ := reqParams.Get("producers")
jsonFormat := formatString == "json"

var producersStats []ClientStats
switch strings.ToLower(withProducers) {
case "1", "yes", "true":
producersStats = s.ctx.nsqd.GetProducerStats()
}

stats := s.ctx.nsqd.GetStats()
health := s.ctx.nsqd.GetHealth()
startTime := s.ctx.nsqd.GetStartTime()
Expand Down Expand Up @@ -497,19 +504,20 @@ func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httpro

ms := getMemStats()
if !jsonFormat {
return s.printStats(stats, ms, health, startTime, uptime), nil
return s.printStats(stats, producersStats, ms, health, startTime, uptime), nil
}

return struct {
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory *memStats `json:"memory"`
}{version.Binary, health, startTime.Unix(), stats, ms}, nil
Version string `json:"version"`
Health string `json:"health"`
StartTime int64 `json:"start_time"`
Topics []TopicStats `json:"topics"`
Memory *memStats `json:"memory"`
Producers []ClientStats `json:"producers,omitempty"`
}{version.Binary, health, startTime.Unix(), stats, ms, producersStats}, nil
}

func (s *httpServer) printStats(stats []TopicStats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte {
func (s *httpServer) printStats(stats []TopicStats, producersStats []ClientStats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte {
var buf bytes.Buffer
w := &buf
now := time.Now()
Expand Down Expand Up @@ -582,6 +590,21 @@ func (s *httpServer) printStats(stats []TopicStats, ms *memStats, health string,
}
}
}

if len(producersStats) != 0 {
fmt.Fprintf(w, "\nProducers:\n")
for _, client := range producersStats {
connectTime := time.Unix(client.ConnectTime, 0)
// truncate to the second
duration := time.Duration(int64(now.Sub(connectTime).Seconds())) * time.Second
io.WriteString(w, fmt.Sprintf(" [%s %-21s] msgs: %-8d connected: %s\n",
client.Version,
client.ClientID,
client.PublishCount,
duration,
))
}
}
return buf.Bytes()
}

Expand Down
26 changes: 26 additions & 0 deletions nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type errStore struct {
err error
}

type Client interface {
Stats() ClientStats
IsProducer() bool
}

type NSQD struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
clientIDSequence int64
Expand All @@ -52,6 +57,7 @@ type NSQD struct {
startTime time.Time

topicMap map[string]*Topic
clients map[int64]Client

lookupPeers atomic.Value

Expand Down Expand Up @@ -83,6 +89,7 @@ func New(opts *Options) *NSQD {
n := &NSQD{
startTime: time.Now(),
topicMap: make(map[string]*Topic),
clients: make(map[int64]Client),
exitChan: make(chan int),
notifyChan: make(chan interface{}),
optsNotificationChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -206,6 +213,25 @@ func (n *NSQD) GetStartTime() time.Time {
return n.startTime
}

// AddClient adds a client to the producers map
func (n *NSQD) AddClient(clientID int64, client Client) {
n.Lock()
n.clients[clientID] = client
n.Unlock()
}

// RemoveClient removes a client from the producers map
func (n *NSQD) RemoveClient(clientID int64) {
n.Lock()
_, ok := n.clients[clientID]
if !ok {
n.Unlock()
return
}
delete(n.clients, clientID)
n.Unlock()
}

func (n *NSQD) Main() {
var httpListener net.Listener
var httpsListener net.Listener
Expand Down
8 changes: 8 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {

clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
p.ctx.nsqd.AddClient(client.ID, client)

// synchronize the startup of messagePump in order
// to guarantee that it gets a chance to initialize
Expand Down Expand Up @@ -120,6 +121,7 @@ func (p *protocolV2) IOLoop(conn net.Conn) error {
client.Channel.RemoveClient(client.ID)
}

p.ctx.nsqd.RemoveClient(client.ID)
return err
}

Expand Down Expand Up @@ -796,6 +798,8 @@ func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}

client.PublishedMessage(1)

return okBytes, nil
}

Expand Down Expand Up @@ -847,6 +851,8 @@ func (p *protocolV2) MPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_MPUB_FAILED", "MPUB failed "+err.Error())
}

client.PublishedMessage(uint64(len(messages)))

return okBytes, nil
}

Expand Down Expand Up @@ -909,6 +915,8 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error())
}

client.PublishedMessage(1)

return okBytes, nil
}

Expand Down
17 changes: 17 additions & 0 deletions nsqd/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type ClientStats struct {
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
PublishCount uint64 `json:"publish_count,omitempty"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Expand Down Expand Up @@ -145,6 +146,22 @@ func (n *NSQD) GetStats() []TopicStats {
return topics
}

func (n *NSQD) GetProducerStats() []ClientStats {
n.RLock()
var realProducers []Client
for _, c := range n.clients {
if c.IsProducer() {
realProducers = append(realProducers, c)
}
}
n.RUnlock()
producerStats := make([]ClientStats, 0, len(realProducers))
for _, p := range realProducers {
producerStats = append(producerStats, p.Stats())
}
return producerStats
}

type memStats struct {
HeapObjects uint64 `json:"heap_objects"`
HeapIdleBytes uint64 `json:"heap_idle_bytes"`
Expand Down

0 comments on commit 41ba852

Please sign in to comment.