Skip to content

Commit

Permalink
Implement AddGauge & AddCounter functions
Browse files Browse the repository at this point in the history
and utilize them in the in the 'system' input plugins.
  • Loading branch information
sparrc committed Sep 2, 2016
1 parent 6dbbe65 commit 5ceffc3
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 57 deletions.
14 changes: 14 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ type Accumulator interface {
tags map[string]string,
t ...time.Time)

// AddGauge is the same as AddFields, but will add the metric as a "Gauge"
// type
AddGauge(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)

// AddCounter is the same as AddFields, but will add the metric as a "Counter"
// type
AddCounter(measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time)

AddError(err error)

Debug() bool
Expand Down
61 changes: 54 additions & 7 deletions agent/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,52 @@ func (ac *accumulator) AddFields(
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Untyped, t...); m != nil {
ac.metrics <- m
}
}

func (ac *accumulator) AddGauge(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Gauge, t...); m != nil {
ac.metrics <- m
}
}

func (ac *accumulator) AddCounter(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
if m := ac.makeMetric(measurement, fields, tags, telegraf.Counter, t...); m != nil {
ac.metrics <- m
}
}

// makeMetric either returns a metric, or returns nil if the metric doesn't
// need to be created (because of filtering, an error, etc.)
func (ac *accumulator) makeMetric(
measurement string,
fields map[string]interface{},
tags map[string]string,
mType telegraf.ValueType,
t ...time.Time,
) telegraf.Metric {
if len(fields) == 0 || len(measurement) == 0 {
return
return nil
}

if !ac.inputConfig.Filter.ShouldNamePass(measurement) {
return
return nil
}

if !ac.inputConfig.Filter.ShouldTagsPass(tags) {
return
return nil
}

// Override measurement name if set
Expand Down Expand Up @@ -120,7 +156,7 @@ func (ac *accumulator) AddFields(
}
fields = nil
if len(result) == 0 {
return
return nil
}

var timestamp time.Time
Expand All @@ -131,15 +167,26 @@ func (ac *accumulator) AddFields(
}
timestamp = timestamp.Round(ac.precision)

m, err := telegraf.NewMetric(measurement, tags, result, timestamp)
var m telegraf.Metric
var err error
switch mType {
case telegraf.Counter:
m, err = telegraf.NewCounterMetric(measurement, tags, result, timestamp)
case telegraf.Gauge:
m, err = telegraf.NewGaugeMetric(measurement, tags, result, timestamp)
default:
m, err = telegraf.NewMetric(measurement, tags, result, timestamp)
}
if err != nil {
log.Printf("Error adding point [%s]: %s\n", measurement, err.Error())
return
return nil
}

if ac.trace {
fmt.Println("> " + m.String())
}
ac.metrics <- m

return m
}

// AddError passes a runtime error to the accumulator.
Expand Down
70 changes: 70 additions & 0 deletions agent/accumulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,76 @@ func TestAdd(t *testing.T) {
actual)
}

func TestAddGauge(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}

a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddGauge("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)

testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Equal(t, testm.Type(), telegraf.Gauge)

testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
assert.Equal(t, testm.Type(), telegraf.Gauge)

testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
actual)
assert.Equal(t, testm.Type(), telegraf.Gauge)
}

func TestAddCounter(t *testing.T) {
a := accumulator{}
now := time.Now()
a.metrics = make(chan telegraf.Metric, 10)
defer close(a.metrics)
a.inputConfig = &models.InputConfig{}

a.AddCounter("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{})
a.AddCounter("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"})
a.AddCounter("acctest",
map[string]interface{}{"value": float64(101)},
map[string]string{"acc": "test"}, now)

testm := <-a.metrics
actual := testm.String()
assert.Contains(t, actual, "acctest value=101")
assert.Equal(t, testm.Type(), telegraf.Counter)

testm = <-a.metrics
actual = testm.String()
assert.Contains(t, actual, "acctest,acc=test value=101")
assert.Equal(t, testm.Type(), telegraf.Counter)

testm = <-a.metrics
actual = testm.String()
assert.Equal(t,
fmt.Sprintf("acctest,acc=test value=101 %d", now.UnixNano()),
actual)
assert.Equal(t, testm.Type(), telegraf.Counter)
}

func TestAddNoPrecisionWithInterval(t *testing.T) {
a := accumulator{}
now := time.Date(2006, time.February, 10, 12, 0, 0, 82912748, time.UTC)
Expand Down
65 changes: 35 additions & 30 deletions plugins/inputs/system/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type CPUStats struct {
ps PS
lastStats []cpu.TimesStat

PerCPU bool `toml:"percpu"`
TotalCPU bool `toml:"totalcpu"`
PerCPU bool `toml:"percpu"`
TotalCPU bool `toml:"totalcpu"`
CollectCPUTime bool `toml:"collect_cpu_time"`
}

func NewCPUStats(ps PS) *CPUStats {
return &CPUStats{
ps: ps,
ps: ps,
CollectCPUTime: true,
}
}

Expand All @@ -32,8 +34,8 @@ var sampleConfig = `
percpu = true
## Whether to report total system cpu stats or not
totalcpu = true
## Comment this line if you want the raw CPU time metrics
fielddrop = ["time_*"]
## If true, collect raw CPU time metrics.
collect_cpu_time = false
`

func (_ *CPUStats) SampleConfig() string {
Expand All @@ -54,23 +56,25 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {

total := totalCpuTime(cts)

// Add cpu time metrics
fields := map[string]interface{}{
"time_user": cts.User,
"time_system": cts.System,
"time_idle": cts.Idle,
"time_nice": cts.Nice,
"time_iowait": cts.Iowait,
"time_irq": cts.Irq,
"time_softirq": cts.Softirq,
"time_steal": cts.Steal,
"time_guest": cts.Guest,
"time_guest_nice": cts.GuestNice,
if s.CollectCPUTime {
// Add cpu time metrics
fieldsC := map[string]interface{}{
"time_user": cts.User,
"time_system": cts.System,
"time_idle": cts.Idle,
"time_nice": cts.Nice,
"time_iowait": cts.Iowait,
"time_irq": cts.Irq,
"time_softirq": cts.Softirq,
"time_steal": cts.Steal,
"time_guest": cts.Guest,
"time_guest_nice": cts.GuestNice,
}
acc.AddCounter("cpu", fieldsC, tags, now)
}

// Add in percentage
if len(s.lastStats) == 0 {
acc.AddFields("cpu", fields, tags, now)
// If it's the 1st gather, can't get CPU Usage stats yet
continue
}
Expand All @@ -86,18 +90,19 @@ func (s *CPUStats) Gather(acc telegraf.Accumulator) error {
if totalDelta == 0 {
continue
}

fields["usage_user"] = 100 * (cts.User - lastCts.User) / totalDelta
fields["usage_system"] = 100 * (cts.System - lastCts.System) / totalDelta
fields["usage_idle"] = 100 * (cts.Idle - lastCts.Idle) / totalDelta
fields["usage_nice"] = 100 * (cts.Nice - lastCts.Nice) / totalDelta
fields["usage_iowait"] = 100 * (cts.Iowait - lastCts.Iowait) / totalDelta
fields["usage_irq"] = 100 * (cts.Irq - lastCts.Irq) / totalDelta
fields["usage_softirq"] = 100 * (cts.Softirq - lastCts.Softirq) / totalDelta
fields["usage_steal"] = 100 * (cts.Steal - lastCts.Steal) / totalDelta
fields["usage_guest"] = 100 * (cts.Guest - lastCts.Guest) / totalDelta
fields["usage_guest_nice"] = 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta
acc.AddFields("cpu", fields, tags, now)
fieldsG := map[string]interface{}{
"usage_user": 100 * (cts.User - lastCts.User) / totalDelta,
"usage_system": 100 * (cts.System - lastCts.System) / totalDelta,
"usage_idle": 100 * (cts.Idle - lastCts.Idle) / totalDelta,
"usage_nice": 100 * (cts.Nice - lastCts.Nice) / totalDelta,
"usage_iowait": 100 * (cts.Iowait - lastCts.Iowait) / totalDelta,
"usage_irq": 100 * (cts.Irq - lastCts.Irq) / totalDelta,
"usage_softirq": 100 * (cts.Softirq - lastCts.Softirq) / totalDelta,
"usage_steal": 100 * (cts.Steal - lastCts.Steal) / totalDelta,
"usage_guest": 100 * (cts.Guest - lastCts.Guest) / totalDelta,
"usage_guest_nice": 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta,
}
acc.AddGauge("cpu", fieldsG, tags, now)
}

s.lastStats = times
Expand Down
4 changes: 2 additions & 2 deletions plugins/inputs/system/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *DiskStats) Gather(acc telegraf.Accumulator) error {
"inodes_free": du.InodesFree,
"inodes_used": du.InodesUsed,
}
acc.AddFields("disk", fields, tags)
acc.AddGauge("disk", fields, tags)
}

return nil
Expand Down Expand Up @@ -139,7 +139,7 @@ func (s *DiskIOStats) Gather(acc telegraf.Accumulator) error {
"write_time": io.WriteTime,
"io_time": io.IoTime,
}
acc.AddFields("diskio", fields, tags)
acc.AddCounter("diskio", fields, tags)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/system/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (k *Kernel) Gather(acc telegraf.Accumulator) error {
}
}

acc.AddFields("kernel", fields, map[string]string{})
acc.AddCounter("kernel", fields, map[string]string{})

return nil
}
Expand Down
13 changes: 8 additions & 5 deletions plugins/inputs/system/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *MemStats) Gather(acc telegraf.Accumulator) error {
"used_percent": 100 * float64(vm.Used) / float64(vm.Total),
"available_percent": 100 * float64(vm.Available) / float64(vm.Total),
}
acc.AddFields("mem", fields, nil)
acc.AddCounter("mem", fields, nil)

return nil
}
Expand All @@ -56,15 +56,18 @@ func (s *SwapStats) Gather(acc telegraf.Accumulator) error {
return fmt.Errorf("error getting swap memory info: %s", err)
}

fields := map[string]interface{}{
fieldsG := map[string]interface{}{
"total": swap.Total,
"used": swap.Used,
"free": swap.Free,
"used_percent": swap.UsedPercent,
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddFields("swap", fields, nil)
fieldsC := map[string]interface{}{
"in": swap.Sin,
"out": swap.Sout,
}
acc.AddGauge("swap", fieldsG, nil)
acc.AddCounter("swap", fieldsC, nil)

return nil
}
Expand Down
2 changes: 0 additions & 2 deletions plugins/inputs/system/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ func TestMemStats(t *testing.T) {
"used": uint64(1232),
"used_percent": float64(12.2),
"free": uint64(6412),
"in": uint64(7),
"out": uint64(830),
}
acc.AssertContainsTaggedFields(t, "swap", swapfields, make(map[string]string))
}
2 changes: 1 addition & 1 deletion plugins/inputs/system/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *NetIOStats) Gather(acc telegraf.Accumulator) error {
"drop_in": io.Dropin,
"drop_out": io.Dropout,
}
acc.AddFields("net", fields, tags)
acc.AddCounter("net", fields, tags)
}

// Get system wide stats for different network protocols
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/system/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (p *Processes) Gather(acc telegraf.Accumulator) error {
}
}

acc.AddFields("processes", fields, nil)
acc.AddGauge("processes", fields, nil)
return nil
}

Expand Down
Loading

0 comments on commit 5ceffc3

Please sign in to comment.