Skip to content

Commit

Permalink
Make NSQ plugin compatible with version 0.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Jan 15, 2016
1 parent dbbb2d9 commit 7a8d1b7
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 88 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/influxdb/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdb/telegraf/plugins/inputs/mysql"
_ "github.com/influxdb/telegraf/plugins/inputs/nginx"
_ "github.com/influxdb/telegraf/plugins/inputs/nsq"
_ "github.com/influxdb/telegraf/plugins/inputs/phpfpm"
_ "github.com/influxdb/telegraf/plugins/inputs/ping"
_ "github.com/influxdb/telegraf/plugins/inputs/postgresql"
Expand Down
109 changes: 60 additions & 49 deletions plugins/inputs/nsq/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sync"
"time"

"github.com/influxdb/telegraf/plugins"
"github.com/influxdb/telegraf/plugins/inputs"
)

// Might add Lookupd endpoints for cluster discovery
Expand All @@ -41,15 +41,15 @@ type NSQ struct {

var sampleConfig = `
# An array of NSQD HTTP API endpoints
endpoints = ["http://localhost:4151","http://otherhost:4151"]
endpoints = ["http://localhost:4151"]
`

const (
requestPattern = `%s/stats?format=json`
)

func init() {
plugins.Add("nsq", func() plugins.Plugin {
inputs.Add("nsq", func() inputs.Input {
return &NSQ{}
})
}
Expand All @@ -62,7 +62,7 @@ func (n *NSQ) Description() string {
return "Read NSQ topic and channel statistics."
}

func (n *NSQ) Gather(acc plugins.Accumulator) error {
func (n *NSQ) Gather(acc inputs.Accumulator) error {
var wg sync.WaitGroup
var outerr error

Expand All @@ -85,7 +85,7 @@ var tr = &http.Transport{

var client = &http.Client{Transport: tr}

func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error {
func (n *NSQ) gatherEndpoint(e string, acc inputs.Accumulator) error {
u, err := buildURL(e)
if err != nil {
return err
Expand All @@ -111,13 +111,15 @@ func (n *NSQ) gatherEndpoint(e string, acc plugins.Accumulator) error {
`server_version`: s.Data.Version,
}

fields := make(map[string]interface{})
if s.Data.Health == `OK` {
acc.Add(`nsq_server_count`, int64(1), tags)
fields["server_count"] = int64(1)
} else {
acc.Add(`nsq_server_count`, int64(0), tags)
fields["server_count"] = int64(0)
}
fields["topic_count"] = int64(len(s.Data.Topics))

acc.Add(`nsq_server_topic_count`, int64(len(s.Data.Topics)), tags)
acc.AddFields("nsq_server", fields, tags)
for _, t := range s.Data.Topics {
topicStats(t, acc, u.Host, s.Data.Version)
}
Expand All @@ -134,68 +136,77 @@ func buildURL(e string) (*url.URL, error) {
return addr, nil
}

func topicStats(t TopicStats, acc plugins.Accumulator, host, version string) {

func topicStats(t TopicStats, acc inputs.Accumulator, host, version string) {
// per topic overall (tag: name, paused, channel count)
tags := map[string]string{
`server_host`: host,
`server_version`: version,
`topic`: t.Name,
"server_host": host,
"server_version": version,
"topic": t.Name,
}

acc.Add(`nsq_topic_depth`, t.Depth, tags)
acc.Add(`nsq_topic_backend_depth`, t.BackendDepth, tags)
acc.Add(`nsq_topic_message_count`, t.MessageCount, tags)
fields := map[string]interface{}{
"depth": t.Depth,
"backend_depth": t.BackendDepth,
"message_count": t.MessageCount,
"channel_count": int64(len(t.Channels)),
}
acc.AddFields("nsq_topic", fields, tags)

acc.Add(`nsq_topic_channel_count`, int64(len(t.Channels)), tags)
for _, c := range t.Channels {
channelStats(c, acc, host, version, t.Name)
}
}

func channelStats(c ChannelStats, acc plugins.Accumulator, host, version, topic string) {
func channelStats(c ChannelStats, acc inputs.Accumulator, host, version, topic string) {
tags := map[string]string{
`server_host`: host,
`server_version`: version,
`topic`: topic,
`channel`: c.Name,
"server_host": host,
"server_version": version,
"topic": topic,
"channel": c.Name,
}

acc.Add("nsq_channel_depth", c.Depth, tags)
acc.Add("nsq_channel_backend_depth", c.BackendDepth, tags)
acc.Add("nsq_channel_inflight_count", c.InFlightCount, tags)
acc.Add("nsq_channel_deferred_count", c.DeferredCount, tags)
acc.Add("nsq_channel_message_count", c.MessageCount, tags)
acc.Add("nsq_channel_requeue_count", c.RequeueCount, tags)
acc.Add("nsq_channel_timeout_count", c.TimeoutCount, tags)
fields := map[string]interface{}{
"depth": c.Depth,
"backend_depth": c.BackendDepth,
"inflight_count": c.InFlightCount,
"deferred_count": c.DeferredCount,
"message_count": c.MessageCount,
"requeue_count": c.RequeueCount,
"timeout_count": c.TimeoutCount,
"client_count": int64(len(c.Clients)),
}

acc.Add("nsq_channel_client_count", int64(len(c.Clients)), tags)
acc.AddFields("nsq_channel", fields, tags)
for _, cl := range c.Clients {
clientStats(cl, acc, host, version, topic, c.Name)
}
}

func clientStats(c ClientStats, acc plugins.Accumulator, host, version, topic, channel string) {
func clientStats(c ClientStats, acc inputs.Accumulator, host, version, topic, channel string) {
tags := map[string]string{
`server_host`: host,
`server_version`: version,
`topic`: topic,
`channel`: channel,
`client_name`: c.Name,
`client_id`: c.ID,
`client_hostname`: c.Hostname,
`client_version`: c.Version,
`client_address`: c.RemoteAddress,
`client_user_agent`: c.UserAgent,
`client_tls`: strconv.FormatBool(c.TLS),
`client_snappy`: strconv.FormatBool(c.Snappy),
`client_deflate`: strconv.FormatBool(c.Deflate),
"server_host": host,
"server_version": version,
"topic": topic,
"channel": channel,
"client_name": c.Name,
"client_id": c.ID,
"client_hostname": c.Hostname,
"client_version": c.Version,
"client_address": c.RemoteAddress,
"client_user_agent": c.UserAgent,
"client_tls": strconv.FormatBool(c.TLS),
"client_snappy": strconv.FormatBool(c.Snappy),
"client_deflate": strconv.FormatBool(c.Deflate),
}

fields := map[string]interface{}{
"ready_count": c.ReadyCount,
"inflight_count": c.InFlightCount,
"message_count": c.MessageCount,
"finish_count": c.FinishCount,
"requeue_count": c.RequeueCount,
}
acc.Add("nsq_client_ready_count", c.ReadyCount, tags)
acc.Add("nsq_client_inflight_count", c.InFlightCount, tags)
acc.Add("nsq_client_message_count", c.MessageCount, tags)
acc.Add("nsq_client_finish_count", c.FinishCount, tags)
acc.Add("nsq_client_requeue_count", c.RequeueCount, tags)
acc.AddFields("nsq_client", fields, tags)
}

type NSQStats struct {
Expand Down
149 changes: 110 additions & 39 deletions plugins/inputs/nsq/nsq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/influxdb/telegraf/testutil"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -35,49 +34,121 @@ func TestNSQStats(t *testing.T) {
// actually validate the tests
tests := []struct {
m string
v int64
f map[string]interface{}
g map[string]string
}{
{`nsq_server_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`}},
{`nsq_server_topic_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`}},
{`nsq_topic_depth`, int64(12), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
{`nsq_topic_backend_depth`, int64(13), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
{`nsq_topic_message_count`, int64(14), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
{`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`}},
{`nsq_channel_depth`, int64(0), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_backend_depth`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_inflight_count`, int64(2), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_deferred_count`, int64(3), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_message_count`, int64(4), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_requeue_count`, int64(5), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_timeout_count`, int64(6), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`}},
{`nsq_client_ready_count`, int64(200), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
{`nsq_client_inflight_count`, int64(7), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
{`nsq_client_message_count`, int64(8), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
{`nsq_client_finish_count`, int64(9), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
{`nsq_client_requeue_count`, int64(10), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`, `client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`, `client_version`: `V2`, `client_address`: `172.17.0.11:35560`, `client_tls`: `false`, `client_snappy`: `false`, `client_deflate`: `false`, `client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`}},
{`nsq_topic_depth`, int64(28), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
{`nsq_topic_backend_depth`, int64(29), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
{`nsq_topic_message_count`, int64(30), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
{`nsq_topic_channel_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`}},
{`nsq_channel_depth`, int64(15), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_backend_depth`, int64(16), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_inflight_count`, int64(17), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_deferred_count`, int64(18), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_message_count`, int64(19), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_requeue_count`, int64(20), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_timeout_count`, int64(21), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_channel_client_count`, int64(1), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`}},
{`nsq_client_ready_count`, int64(22), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
{`nsq_client_inflight_count`, int64(23), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
{`nsq_client_message_count`, int64(24), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
{`nsq_client_finish_count`, int64(25), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
{`nsq_client_requeue_count`, int64(26), map[string]string{`server_host`: host, `server_version`: `0.3.6`, `topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`, `client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`, `client_version`: `V2`, `client_address`: `172.17.0.8:48145`, `client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`, `client_snappy`: `true`, `client_deflate`: `true`}},
{
`nsq_server`,
map[string]interface{}{
"server_count": int64(1),
"topic_count": int64(2),
},
map[string]string{
`server_host`: host,
`server_version`: `0.3.6`,
},
},
{
`nsq_topic`,
map[string]interface{}{
"depth": int64(12),
"backend_depth": int64(13),
"message_count": int64(14),
"channel_count": int64(1),
},
map[string]string{
`server_host`: host,
`server_version`: `0.3.6`,
`topic`: `t1`},
},
{
"nsq_channel",
map[string]interface{}{
"depth": int64(0),
"backend_depth": int64(1),
"inflight_count": int64(2),
"deferred_count": int64(3),
"message_count": int64(4),
"requeue_count": int64(5),
"timeout_count": int64(6),
"client_count": int64(1),
},
map[string]string{
`server_host`: host,
`server_version`: `0.3.6`,
`topic`: `t1`,
`channel`: `c1`,
},
},
{
"nsq_client",
map[string]interface{}{
"ready_count": int64(200),
"inflight_count": int64(7),
"message_count": int64(8),
"finish_count": int64(9),
"requeue_count": int64(10),
},
map[string]string{`server_host`: host, `server_version`: `0.3.6`,
`topic`: `t1`, `channel`: `c1`, `client_name`: `373a715cd990`,
`client_id`: `373a715cd990`, `client_hostname`: `373a715cd990`,
`client_version`: `V2`, `client_address`: `172.17.0.11:35560`,
`client_tls`: `false`, `client_snappy`: `false`,
`client_deflate`: `false`,
`client_user_agent`: `nsq_to_nsq/0.3.6 go-nsq/1.0.5`},
},
{
`nsq_topic`,
map[string]interface{}{
"depth": int64(28),
"backend_depth": int64(29),
"message_count": int64(30),
"channel_count": int64(1),
},
map[string]string{
`server_host`: host,
`server_version`: `0.3.6`,
`topic`: `t2`},
},
{
"nsq_channel",
map[string]interface{}{
"depth": int64(15),
"backend_depth": int64(16),
"inflight_count": int64(17),
"deferred_count": int64(18),
"message_count": int64(19),
"requeue_count": int64(20),
"timeout_count": int64(21),
"client_count": int64(1),
},
map[string]string{
`server_host`: host,
`server_version`: `0.3.6`,
`topic`: `t2`,
`channel`: `c2`,
},
},
{
"nsq_client",
map[string]interface{}{
"ready_count": int64(22),
"inflight_count": int64(23),
"message_count": int64(24),
"finish_count": int64(25),
"requeue_count": int64(26),
},
map[string]string{`server_host`: host, `server_version`: `0.3.6`,
`topic`: `t2`, `channel`: `c2`, `client_name`: `377569bd462b`,
`client_id`: `377569bd462b`, `client_hostname`: `377569bd462b`,
`client_version`: `V2`, `client_address`: `172.17.0.8:48145`,
`client_user_agent`: `go-nsq/1.0.5`, `client_tls`: `true`,
`client_snappy`: `true`, `client_deflate`: `true`},
},
}

for _, test := range tests {
assert.True(t, acc.CheckTaggedValue(test.m, test.v, test.g), "Failed expectation: (\"%v\", \"%v\", \"%v\")", test.m, test.v, fmt.Sprint(test.g))
acc.AssertContainsTaggedFields(t, test.m, test.f, test.g)
}
}

Expand Down

0 comments on commit 7a8d1b7

Please sign in to comment.