Skip to content

Commit

Permalink
Report rabbitmq_node measurement and return on gather error (influxda…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 4260788 commit cf72fdc
Showing 1 changed file with 82 additions and 104 deletions.
186 changes: 82 additions & 104 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,134 +448,112 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
}

func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
allNodes := make([]Node, 0)
// Gather information about nodes
allNodes := make([]*Node, 0)

err := r.requestJSON("/api/nodes", &allNodes)
if err != nil {
acc.AddError(err)
return
}

nodes := make(map[string]Node)
nodes := allNodes[:0]
for _, node := range allNodes {
if r.shouldGatherNode(node) {
nodes[node.Name] = node
nodes = append(nodes, node)
}
}

numberNodes := len(nodes)
if numberNodes == 0 {
return
}

type NodeCheck struct {
NodeName string
HealthCheck HealthCheck
Memory *Memory
}

nodeChecksChannel := make(chan NodeCheck, numberNodes)

var wg sync.WaitGroup
for _, node := range nodes {
go func(nodeName string, healthChecksChannel chan NodeCheck) {
var healthCheck HealthCheck
var memoryresponse MemoryResponse

err := r.requestJSON("/api/healthchecks/node/"+nodeName, &healthCheck)
nodeCheck := NodeCheck{
NodeName: nodeName,
HealthCheck: healthCheck,
wg.Add(1)
go func(node *Node) {
defer wg.Done()

tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
}

var health HealthCheck
err := r.requestJSON("/api/healthchecks/node/"+node.Name, &health)
if err != nil {
acc.AddError(err)
return
}

err = r.requestJSON("/api/nodes/"+nodeName+"/memory", &memoryresponse)
nodeCheck.Memory = memoryresponse.Memory
if health.Status == "ok" {
fields["health_check_status"] = int64(1)
} else {
fields["health_check_status"] = int64(0)
}

var memory MemoryResponse
err = r.requestJSON("/api/nodes/"+node.Name+"/memory", &memory)
if err != nil {
acc.AddError(err)
return
}

nodeChecksChannel <- nodeCheck
}(node.Name, nodeChecksChannel)
}

now := time.Now()

for i := 0; i < len(nodes); i++ {
nodeCheck := <-nodeChecksChannel

var healthCheckStatus int64 = 0

if nodeCheck.HealthCheck.Status == "ok" {
healthCheckStatus = 1
}
if memory.Memory != nil {
fields["mem_connection_readers"] = memory.Memory.ConnectionReaders
fields["mem_connection_writers"] = memory.Memory.ConnectionWriters
fields["mem_connection_channels"] = memory.Memory.ConnectionChannels
fields["mem_connection_other"] = memory.Memory.ConnectionOther
fields["mem_queue_procs"] = memory.Memory.QueueProcs
fields["mem_queue_slave_procs"] = memory.Memory.QueueSlaveProcs
fields["mem_plugins"] = memory.Memory.Plugins
fields["mem_other_proc"] = memory.Memory.OtherProc
fields["mem_metrics"] = memory.Memory.Metrics
fields["mem_mgmt_db"] = memory.Memory.MgmtDb
fields["mem_mnesia"] = memory.Memory.Mnesia
fields["mem_other_ets"] = memory.Memory.OtherEts
fields["mem_binary"] = memory.Memory.Binary
fields["mem_msg_index"] = memory.Memory.MsgIndex
fields["mem_code"] = memory.Memory.Code
fields["mem_atom"] = memory.Memory.Atom
fields["mem_other_system"] = memory.Memory.OtherSystem
fields["mem_allocated_unused"] = memory.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = memory.Memory.ReservedUnallocated
fields["mem_total"] = memory.Memory.Total
}

node := nodes[nodeCheck.NodeName]

tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
"disk_free_alarm": boolToInt(node.DiskFreeAlarm),
"fd_total": node.FdTotal,
"fd_used": node.FdUsed,
"mem_limit": node.MemLimit,
"mem_used": node.MemUsed,
"mem_alarm": boolToInt(node.MemAlarm),
"proc_total": node.ProcTotal,
"proc_used": node.ProcUsed,
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"uptime": node.Uptime,
"mnesia_disk_tx_count": node.MnesiaDiskTxCount,
"mnesia_disk_tx_count_rate": node.MnesiaDiskTxCountDetails.Rate,
"mnesia_ram_tx_count": node.MnesiaRamTxCount,
"mnesia_ram_tx_count_rate": node.MnesiaRamTxCountDetails.Rate,
"gc_num": node.GcNum,
"gc_num_rate": node.GcNumDetails.Rate,
"gc_bytes_reclaimed": node.GcBytesReclaimed,
"gc_bytes_reclaimed_rate": node.GcBytesReclaimedDetails.Rate,
"io_read_avg_time": node.IoReadAvgTime,
"io_read_avg_time_rate": node.IoReadAvgTimeDetails.Rate,
"io_read_bytes": node.IoReadBytes,
"io_read_bytes_rate": node.IoReadBytesDetails.Rate,
"io_write_avg_time": node.IoWriteAvgTime,
"io_write_avg_time_rate": node.IoWriteAvgTimeDetails.Rate,
"io_write_bytes": node.IoWriteBytes,
"io_write_bytes_rate": node.IoWriteBytesDetails.Rate,
"running": boolToInt(node.Running),
"health_check_status": healthCheckStatus,
}
if nodeCheck.Memory != nil {
fields["mem_connection_readers"] = nodeCheck.Memory.ConnectionReaders
fields["mem_connection_writers"] = nodeCheck.Memory.ConnectionWriters
fields["mem_connection_channels"] = nodeCheck.Memory.ConnectionChannels
fields["mem_connection_other"] = nodeCheck.Memory.ConnectionOther
fields["mem_queue_procs"] = nodeCheck.Memory.QueueProcs
fields["mem_queue_slave_procs"] = nodeCheck.Memory.QueueSlaveProcs
fields["mem_plugins"] = nodeCheck.Memory.Plugins
fields["mem_other_proc"] = nodeCheck.Memory.OtherProc
fields["mem_metrics"] = nodeCheck.Memory.Metrics
fields["mem_mgmt_db"] = nodeCheck.Memory.MgmtDb
fields["mem_mnesia"] = nodeCheck.Memory.Mnesia
fields["mem_other_ets"] = nodeCheck.Memory.OtherEts
fields["mem_binary"] = nodeCheck.Memory.Binary
fields["mem_msg_index"] = nodeCheck.Memory.MsgIndex
fields["mem_code"] = nodeCheck.Memory.Code
fields["mem_atom"] = nodeCheck.Memory.Atom
fields["mem_other_system"] = nodeCheck.Memory.OtherSystem
fields["mem_allocated_unused"] = nodeCheck.Memory.AllocatedUnused
fields["mem_reserved_unallocated"] = nodeCheck.Memory.ReservedUnallocated
fields["mem_total"] = nodeCheck.Memory.Total
}
acc.AddFields("rabbitmq_node", fields, tags, now)
acc.AddFields("rabbitmq_node", fields, tags)
}(node)
}

wg.Wait()
}

func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
Expand Down Expand Up @@ -718,7 +696,7 @@ func gatherFederationLinks(r *RabbitMQ, acc telegraf.Accumulator) {
}
}

func (r *RabbitMQ) shouldGatherNode(node Node) bool {
func (r *RabbitMQ) shouldGatherNode(node *Node) bool {
if len(r.Nodes) == 0 {
return true
}
Expand Down

0 comments on commit cf72fdc

Please sign in to comment.