Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to filter by metric tags #103

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,46 @@ want to add support for another service or third-party API.
There are 3 configuration options that are configurable per plugin:

* **pass**: An array of strings that is used to filter metrics generated by the
current plugin. Each string in the array is tested as a prefix against metrics
current plugin. Each string in the array is tested as a prefix against metric names
and if it matches, the metric is emitted.
* **drop**: The inverse of pass, if a metric matches, it is not emitted.
* **drop**: The inverse of pass, if a metric name matches, it is not emitted.
* **tagpass**: tag names and arrays of strings that are used to filter metrics by
the current plugin. Each string in the array is tested as an exact match against
the tag name, and if it matches the metric is emitted.
* **tagdrop**: The inverse of tagpass. If a tag matches, the metric is not emitted.
This is tested on metrics that have passed the tagpass test.
* **interval**: How often to gather this metric. Normal plugins use a single
global interval, but if one particular plugin should be run less or more often,
you can configure that here.

### Plugin Configuration Examples

```
# Read metrics about disk usage by mount point
[disk]
interval = "1m" # Run at a 1 minute interval instead of the default

[disk.tagpass]
# These tag conditions are OR, not AND.
# If the (filesystem is ext4 or xfs) or (the path is /opt or /home) then the metric passes
fstype = [ "ext4", "xfs" ]
path = [ "/opt", "/home" ]

[postgresql]

[postgresql.tagdrop]
# Don't report stats about the database name 'testdatabase'
db = [ "testdatabase" ]

```

```
[disk]
# Don't report stats about the following filesystem types
[disk.tagdrop]
fstype = [ "nfs", "tmpfs", "ecryptfs" ]
```

## Plugins

This section is for developers that want to create new collection plugins.
Expand Down
4 changes: 2 additions & 2 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (bp *BatchPoints) Add(measurement string, val interface{}, tags map[string]
measurement = bp.Prefix + measurement

if bp.Config != nil {
if !bp.Config.ShouldPass(measurement) {
if !bp.Config.ShouldPass(measurement, tags) {
return
}
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (bp *BatchPoints) AddValuesWithTime(
measurement = bp.Prefix + measurement

if bp.Config != nil {
if !bp.Config.ShouldPass(measurement) {
if !bp.Config.ShouldPass(measurement, tags) {
return
}
}
Expand Down
76 changes: 75 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,28 @@ func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}

// The name of a tag, and the values on which to filter
type TagFilter struct {
Name string
Filter []string
}

// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists
// Also lists the tags to filter
type ConfiguredPlugin struct {
Name string

Drop []string
Pass []string

TagDrop []TagFilter
TagPass []TagFilter

Interval time.Duration
}

// ShouldPass returns true if the metric should pass, false if should drop
func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
func (cp *ConfiguredPlugin) ShouldPass(measurement string, tags map[string]string) bool {
if cp.Pass != nil {
for _, pat := range cp.Pass {
if strings.HasPrefix(measurement, pat) {
Expand All @@ -83,6 +93,32 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
return true
}

if cp.TagPass != nil {
for _, pat := range cp.TagPass {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
return true
}
}
}
}
return false
}

if cp.TagDrop != nil {
for _, pat := range cp.TagDrop {
if tagval, ok := tags[pat.Name]; ok {
for _, filter := range pat.Filter {
if filter == tagval {
return false
}
}
}
}
return true
}

return true
}

Expand Down Expand Up @@ -140,9 +176,47 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err
}
}

if node, ok := tbl.Fields["tagpass"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagPass = append(cp.TagPass, *tagfilter)
}
}
}
}

if node, ok := tbl.Fields["tagdrop"]; ok {
if subtbl, ok := node.(*ast.Table); ok {
for name, val := range subtbl.Fields {
if kv, ok := val.(*ast.KeyValue); ok {
tagfilter := &TagFilter{Name: name}
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
tagfilter.Filter = append(tagfilter.Filter, str.Value)
}
}
}
cp.TagDrop = append(cp.TagDrop, *tagfilter)
}
}
}
}

delete(tbl.Fields, "drop")
delete(tbl.Fields, "pass")
delete(tbl.Fields, "interval")
delete(tbl.Fields, "tagdrop")
delete(tbl.Fields, "tagpass")
return cp, toml.UnmarshalTable(tbl, v)
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/system/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func (s *DiskStats) Gather(acc plugins.Accumulator) error {

for _, du := range disks {
tags := map[string]string{
"path": du.Path,
"path": du.Path,
"fstype": du.Fstype,
}

acc.Add("total", du.Total, tags)
acc.Add("free", du.Free, tags)
acc.Add("used", du.Total-du.Free, tags)
Expand Down
1 change: 1 addition & 0 deletions plugins/system/ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (s *systemPS) DiskUsage() ([]*disk.DiskUsageStat, error) {

for _, p := range parts {
du, err := disk.DiskUsage(p.Mountpoint)
du.Fstype = p.Fstype
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions plugins/system/ps/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

type DiskUsageStat struct {
Path string `json:"path"`
Fstype string `json:"fstype"`
Total uint64 `json:"total"`
Free uint64 `json:"free"`
Used uint64 `json:"used"`
Expand Down
3 changes: 2 additions & 1 deletion plugins/system/ps/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestDisk_io_counters(t *testing.T) {
func TestDiskUsageStat_String(t *testing.T) {
v := DiskUsageStat{
Path: "/",
Fstype: "ext4",
Total: 1000,
Free: 2000,
Used: 3000,
Expand All @@ -68,7 +69,7 @@ func TestDiskUsageStat_String(t *testing.T) {
InodesFree: 6000,
InodesUsedPercent: 49.1,
}
e := `{"path":"/","total":1000,"free":2000,"used":3000,"used_percent":50.1,"inodes_total":4000,"inodes_used":5000,"inodes_free":6000,"inodes_used_percent":49.1}`
e := `{"path":"/","fstype":"ext4","total":1000,"free":2000,"used":3000,"used_percent":50.1,"inodes_total":4000,"inodes_used":5000,"inodes_free":6000,"inodes_used_percent":49.1}`
if e != fmt.Sprintf("%v", v) {
t.Errorf("DiskUsageStat string is invalid: %v", v)
}
Expand Down
4 changes: 3 additions & 1 deletion plugins/system/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestSystemStats_GenerateStats(t *testing.T) {

du := &disk.DiskUsageStat{
Path: "/",
Fstype: "ext4",
Total: 128,
Free: 23,
InodesTotal: 1234,
Expand Down Expand Up @@ -195,7 +196,8 @@ func TestSystemStats_GenerateStats(t *testing.T) {
require.NoError(t, err)

tags := map[string]string{
"path": "/",
"path": "/",
"fstype": "ext4",
}

assert.True(t, acc.CheckTaggedValue("total", uint64(128), tags))
Expand Down