Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report rabbitmq_node measurement and return on gather error #6819

Merged
merged 2 commits into from
Jan 3, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 82 additions & 106 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,134 +448,110 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
}

func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
allNodes := make([]Node, 0)
// Gather information about nodes
allNodes := make([]*Node, 0)

err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}

nodes := make(map[string]Node)
nodes := allNodes[:0]
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
nodes = append(nodes, node)
}
}

numberNodes := len(nodes)
if numberNodes == 0 {
return
}

type NodeCheck struct {
NodeName string
HealthCheck HealthCheck
Memory *Memory
}

nodeChecksChannel := make(chan NodeCheck, numberNodes)

var wg sync.WaitGroup
for _, node := range nodes {
go func(nodeName string, healthChecksChannel chan NodeCheck) {
var healthCheck HealthCheck
var memoryresponse MemoryResponse

err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeCheck := NodeCheck{
NodeName: nodeName,
HealthCheck: healthCheck,
wg.Add(1)
go func(node *Node) {
defer wg.Done()

tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
}

var health HealthCheck
err := r.requestJSON("/api/healthchecks/node/"+node.Name, &health)
if err != nil {
acc.AddError(err)
return
}

err = r.requestJSON("/api/nodes/"+nodeName+"/memory", &memoryresponse)
nodeCheck.Memory = memoryresponse.Memory
if health.Status == "ok" {
fields["health_check_status"] = int64(1)
} else {
fields["health_check_status"] = int64(0)
}

var memory MemoryResponse
err = r.requestJSON("/api/nodes/"+node.Name+"/memory", &memory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to make another request when the health check failed? Now that we have "defer wg.done()" shouldn't we return early when health check fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second request is safe to do, but is somewhat likely to fail in the case of an earlier error from requestJSON. It is probably best to not create a partial metric, so I'll change this to return early.

if err != nil {
acc.AddError(err)
return
}

nodeChecksChannel <- nodeCheck
}(node.Name, nodeChecksChannel)
}

now := time.Now()

for i := 0; i < len(nodes); i++ {
nodeCheck := <-nodeChecksChannel

var healthCheckStatus int64 = 0

if nodeCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}
if memory.Memory != nil {
fields["mem_connection_readers"] = memory.Memory.ConnectionReaders
fields["mem_connection_writers"] = memory.Memory.ConnectionWriters
fields["mem_connection_channels"] = memory.Memory.ConnectionChannels
fields["mem_connection_other"] = memory.Memory.ConnectionOther
fields["mem_queue_procs"] = memory.Memory.QueueProcs
fields["mem_queue_slave_procs"] = memory.Memory.QueueSlaveProcs
fields["mem_plugins"] = memory.Memory.Plugins
fields["mem_other_proc"] = memory.Memory.OtherProc
fields["mem_metrics"] = memory.Memory.Metrics
fields["mem_mgmt_db"] = memory.Memory.MgmtDb
fields["mem_mnesia"] = memory.Memory.Mnesia
fields["mem_other_ets"] = memory.Memory.OtherEts
fields["mem_binary"] = memory.Memory.Binary
fields["mem_msg_index"] = memory.Memory.MsgIndex
fields["mem_code"] = memory.Memory.Code
fields["mem_atom"] = memory.Memory.Atom
fields["mem_other_system"] = memory.Memory.OtherSystem
fields["mem_allocated_unused"] = memory.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = memory.Memory.ReservedUnallocated
fields["mem_total"] = memory.Memory.Total
}

node := nodes[nodeCheck.NodeName]

tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
if nodeCheck.Memory != nil {
fields["mem_connection_readers"] = nodeCheck.Memory.ConnectionReaders
fields["mem_connection_writers"] = nodeCheck.Memory.ConnectionWriters
fields["mem_connection_channels"] = nodeCheck.Memory.ConnectionChannels
fields["mem_connection_other"] = nodeCheck.Memory.ConnectionOther
fields["mem_queue_procs"] = nodeCheck.Memory.QueueProcs
fields["mem_queue_slave_procs"] = nodeCheck.Memory.QueueSlaveProcs
fields["mem_plugins"] = nodeCheck.Memory.Plugins
fields["mem_other_proc"] = nodeCheck.Memory.OtherProc
fields["mem_metrics"] = nodeCheck.Memory.Metrics
fields["mem_mgmt_db"] = nodeCheck.Memory.MgmtDb
fields["mem_mnesia"] = nodeCheck.Memory.Mnesia
fields["mem_other_ets"] = nodeCheck.Memory.OtherEts
fields["mem_binary"] = nodeCheck.Memory.Binary
fields["mem_msg_index"] = nodeCheck.Memory.MsgIndex
fields["mem_code"] = nodeCheck.Memory.Code
fields["mem_atom"] = nodeCheck.Memory.Atom
fields["mem_other_system"] = nodeCheck.Memory.OtherSystem
fields["mem_allocated_unused"] = nodeCheck.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = nodeCheck.Memory.ReservedUnallocated
fields["mem_total"] = nodeCheck.Memory.Total
}
acc.AddFields("rabbitmq_node", fields, tags, now)
acc.AddFields("rabbitmq_node", fields, tags)
}(node)
}

wg.Wait()
}

func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
Expand Down Expand Up @@ -718,7 +694,7 @@ func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
}
}

func (r *RabbitMQ) shouldGatherNode(node Node) bool {
func (r *RabbitMQ) shouldGatherNode(node *Node) bool {
if len(r.Nodes) == 0 {
return true
}
Expand Down