diff --git a/collector/collector.go b/collector/collector.go index 50782a4..eddd50b 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -9,7 +9,7 @@ import ( ) type GroupedProcCollector struct { - GroupedProcs map[string]*grouped_proc.GroupedProc + GroupedProcs *grouped_proc.GroupedProcs Metrics map[metric.MetricKey]metric.Metric Enabled map[metric.MetricKey]bool Grouper grouper.Grouper @@ -30,24 +30,25 @@ func (c *GroupedProcCollector) Describe(ch chan<- *prometheus.Desc) { func (c *GroupedProcCollector) Collect(ch chan<- prometheus.Metric) { _ = c.Grouper.Collect(c.GroupedProcs, c.Enabled) - for group, proc := range c.GroupedProcs { + c.GroupedProcs.Range(func(group string, proc *grouped_proc.GroupedProc) bool { log.Debugf("Collect grouped process: %s: %#v\n", group, proc) if !proc.Exists { - delete(c.GroupedProcs, group) + c.GroupedProcs.Delete(group) log.Debugf("Delete grouped process: %s\n", group) - continue + return true } for key, metric := range proc.Metrics { if proc.Enabled[key] { err := metric.PushCollected(ch, c.descs, c.Grouper.Name(), group) if err != nil { // TODO: metric.PushDefaultMetric(ch, c.descs, c.Grouper.Name(), group) - continue + return true } } } proc.Exists = false - } + return true + }) } func (c *GroupedProcCollector) EnableMetric(metric metric.MetricKey) { @@ -61,9 +62,9 @@ func (c *GroupedProcCollector) DisableMetric(metric metric.MetricKey) { // NewGroupedProcCollector func NewGroupedProcCollector(g grouper.Grouper) (*GroupedProcCollector, error) { return &GroupedProcCollector{ - GroupedProcs: make(map[string]*grouped_proc.GroupedProc), + GroupedProcs: grouped_proc.NewGroupedProcs(), Metrics: metric.AvairableMetrics(), - Enabled: grouped_proc.DefaultEnabledMetrics(), + Enabled: metric.DefaultEnabledMetrics(), Grouper: g, descs: make(map[string]*prometheus.Desc), }, nil diff --git a/go.mod b/go.mod index 0440daf..631f233 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,6 @@ require ( github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v0.0.5 github.com/stretchr/testify v1.4.0 // indirect - golang.org/x/sys v0.0.0-20190902133755-9109b7679e13 // indirect + golang.org/x/sys v0.0.0-20190904005037-43c01164e931 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/go.sum b/go.sum index 4d60671..072c2cb 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,8 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190902133755-9109b7679e13 h1:tdsQdquKbTNMsSZLqnLELJGzCANp9oXhu6zFBW6ODx4= -golang.org/x/sys v0.0.0-20190902133755-9109b7679e13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190904005037-43c01164e931 h1:+WYfosiOJzB4BjsISl1Rv4ZLUy+VYcF+u+0Y9jcerv8= +golang.org/x/sys v0.0.0-20190904005037-43c01164e931/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/grouped_proc/grouped_proc.go b/grouped_proc/grouped_proc.go index ebea35d..c45cab5 100644 --- a/grouped_proc/grouped_proc.go +++ b/grouped_proc/grouped_proc.go @@ -29,15 +29,6 @@ func NewGroupedProc(enabled map[metric.MetricKey]bool) *GroupedProc { } } -func DefaultEnabledMetrics() map[metric.MetricKey]bool { - enabled := make(map[metric.MetricKey]bool) - for _, k := range metric.MetricKeys { - enabled[k] = false - } - enabled[metric.ProcProcs] = true - return enabled -} - func (g *GroupedProc) AppendProcAndCollect(pid int) error { fs, err := procfs.NewFS(g.ProcMountPoint) if err != nil { @@ -61,3 +52,42 @@ func (g *GroupedProc) AppendProcAndCollect(pid int) error { return nil } + +type GroupedProcs struct { + sm sync.Map +} + +func (m *GroupedProcs) Load(group string) (*GroupedProc, bool) { + gproc, ok := m.sm.Load(group) + if !ok { + return nil, false + } + return gproc.(*GroupedProc), true +} + +func (m *GroupedProcs) Store(group string, gproc *GroupedProc) { + m.sm.Store(group, gproc) +} + +func (m *GroupedProcs) Delete(group string) { + m.sm.Delete(group) +} + +func (m *GroupedProcs) Range(f func(group string, gproc *GroupedProc) bool) { + m.sm.Range(func(key, value interface{}) bool { + return f(key.(string), value.(*GroupedProc)) + }) +} + +func (m *GroupedProcs) Length() int { + l := 0 + m.Range(func(group string, gproc *GroupedProc) bool { + l = l + 1 + return true + }) + return l +} + +func NewGroupedProcs() *GroupedProcs { + return &GroupedProcs{} +} diff --git a/grouper/cgroup/cgroup.go b/grouper/cgroup/cgroup.go index b521b96..eef9aac 100644 --- a/grouper/cgroup/cgroup.go +++ b/grouper/cgroup/cgroup.go @@ -41,7 +41,7 @@ func (c *Cgroup) Name() string { return "cgroup" } -func (c *Cgroup) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map[metric.MetricKey]bool) error { +func (c *Cgroup) Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool) error { wg := &sync.WaitGroup{} for _, s := range Subsystems { @@ -68,11 +68,16 @@ func (c *Cgroup) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map _ = f.Close() return nil } - _, ok := gpMap[cPath] + var ( + gproc *grouped_proc.GroupedProc + ok bool + ) + gproc, ok = gprocs.Load(cPath) if !ok { - gpMap[cPath] = grouped_proc.NewGroupedProc(enabled) + gproc = grouped_proc.NewGroupedProc(enabled) + gprocs.Store(cPath, gproc) } - gpMap[cPath].Exists = true + gproc.Exists = true reader := bufio.NewReaderSize(f, 1028) for { line, _, err := reader.ReadLine() @@ -92,7 +97,7 @@ func (c *Cgroup) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map go func(wg *sync.WaitGroup, pid int, g *grouped_proc.GroupedProc) { _ = g.AppendProcAndCollect(pid) wg.Done() - }(wg, pid, gpMap[cPath]) + }(wg, pid, gproc) } } return nil diff --git a/grouper/cgroup/cgroup_test.go b/grouper/cgroup/cgroup_test.go index da8e11a..9ed2d50 100644 --- a/grouper/cgroup/cgroup_test.go +++ b/grouper/cgroup/cgroup_test.go @@ -15,23 +15,23 @@ const ( func TestCollect(t *testing.T) { cgroup := testCgroup() - gpMap := make(map[string]*grouped_proc.GroupedProc) + gprocs := grouped_proc.NewGroupedProcs() enabled := make(map[metric.MetricKey]bool) enabled[metric.ProcIO] = true enabled[metric.ProcStat] = true - err := cgroup.Collect(gpMap, enabled) + err := cgroup.Collect(gprocs, enabled) if err != nil { t.Fatalf("%v", err) } - if len(gpMap) != 2 { - t.Errorf("want %d, got %d", 2, len(gpMap)) + if gprocs.Length() != 2 { + t.Errorf("want %d, got %d", 2, gprocs.Length()) } - if _, ok := gpMap["/system.slice/nginx.service"]; !ok { + if _, ok := gprocs.Load("/system.slice/nginx.service"); !ok { t.Errorf("want %s, got none", "/system.slice/nginx.service") } - if _, ok := gpMap["/system.slice/mysql.service"]; !ok { + if _, ok := gprocs.Load("/system.slice/mysql.service"); !ok { t.Errorf("want %s, got none", "/system.slice/mysql.service") } } @@ -42,20 +42,20 @@ func TestCollectWithNormalize(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - gpMap := make(map[string]*grouped_proc.GroupedProc) + gprocs := grouped_proc.NewGroupedProcs() enabled := make(map[metric.MetricKey]bool) enabled[metric.ProcIO] = true enabled[metric.ProcStat] = true - err = cgroup.Collect(gpMap, enabled) + err = cgroup.Collect(gprocs, enabled) if err != nil { t.Fatalf("%v", err) } - if len(gpMap) != 1 { - t.Errorf("want %d, got %d", 1, len(gpMap)) + if gprocs.Length() != 1 { + t.Errorf("want %d, got %d", 1, gprocs.Length()) } - if _, ok := gpMap["system.slice"]; !ok { + if _, ok := gprocs.Load("system.slice"); !ok { t.Errorf("want %s, got none", "system.slice") } } diff --git a/grouper/grouper.go b/grouper/grouper.go index b99d337..213db57 100644 --- a/grouper/grouper.go +++ b/grouper/grouper.go @@ -8,5 +8,5 @@ import ( type Grouper interface { Name() string SetNormalizeRegexp(nReStr string) error - Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map[metric.MetricKey]bool) error + Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool) error } diff --git a/grouper/proc_status_name/proc_status_name.go b/grouper/proc_status_name/proc_status_name.go index 989be82..2ea7fb3 100644 --- a/grouper/proc_status_name/proc_status_name.go +++ b/grouper/proc_status_name/proc_status_name.go @@ -25,7 +25,7 @@ func (g *ProcStatusName) Name() string { return "proc_status_name" } -func (g *ProcStatusName) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map[metric.MetricKey]bool) error { +func (g *ProcStatusName) Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool) error { wg := &sync.WaitGroup{} fs, err := procfs.NewFS(g.procMountPoint) if err != nil { @@ -59,16 +59,21 @@ func (g *ProcStatusName) Collect(gpMap map[string]*grouped_proc.GroupedProc, ena name = matches[1] } } - _, ok := gpMap[name] + var ( + gproc *grouped_proc.GroupedProc + ok bool + ) + gproc, ok = gprocs.Load(name) if !ok { - gpMap[name] = grouped_proc.NewGroupedProc(enabled) + gproc = grouped_proc.NewGroupedProc(enabled) + gprocs.Store(name, gproc) } - gpMap[name].Exists = true + gproc.Exists = true wg.Add(1) go func(wg *sync.WaitGroup, pid int, g *grouped_proc.GroupedProc) { _ = g.AppendProcAndCollect(pid) wg.Done() - }(wg, pid, gpMap[name]) + }(wg, pid, gproc) } wg.Wait() return nil diff --git a/grouper/proc_status_name/proc_status_name_test.go b/grouper/proc_status_name/proc_status_name_test.go index f16c557..e8fc2fd 100644 --- a/grouper/proc_status_name/proc_status_name_test.go +++ b/grouper/proc_status_name/proc_status_name_test.go @@ -14,23 +14,23 @@ const ( func TestCollect(t *testing.T) { procStatusName := testProcStatusName() - gpMap := make(map[string]*grouped_proc.GroupedProc) + gprocs := grouped_proc.NewGroupedProcs() enabled := make(map[metric.MetricKey]bool) enabled[metric.ProcIO] = true enabled[metric.ProcStat] = true - err := procStatusName.Collect(gpMap, enabled) + err := procStatusName.Collect(gprocs, enabled) if err != nil { t.Fatalf("%v", err) } - if len(gpMap) != 2 { - t.Errorf("want %d, got %d", 2, len(gpMap)) + if gprocs.Length() != 2 { + t.Errorf("want %d, got %d", 2, gprocs.Length()) } - if _, ok := gpMap["nginx"]; !ok { + if _, ok := gprocs.Load("nginx"); !ok { t.Errorf("want %s, got none", "nginx") } - if _, ok := gpMap["mysqld"]; !ok { + if _, ok := gprocs.Load("mysqld"); !ok { t.Errorf("want %s, got none", "mysqld") } } @@ -41,23 +41,23 @@ func TestCollectWithNormalize(t *testing.T) { if err != nil { t.Fatalf("%v", err) } - gpMap := make(map[string]*grouped_proc.GroupedProc) + gprocs := grouped_proc.NewGroupedProcs() enabled := make(map[metric.MetricKey]bool) enabled[metric.ProcIO] = true enabled[metric.ProcStat] = true - err = procStatusName.Collect(gpMap, enabled) + err = procStatusName.Collect(gprocs, enabled) if err != nil { t.Fatalf("%v", err) } - if len(gpMap) != 2 { - t.Errorf("want %d, got %d", 2, len(gpMap)) + if gprocs.Length() != 2 { + t.Errorf("want %d, got %d", 2, gprocs.Length()) } - if _, ok := gpMap["nginx"]; !ok { + if _, ok := gprocs.Load("nginx"); !ok { t.Errorf("want %s, got none", "nginx") } - if _, ok := gpMap["mysql"]; !ok { + if _, ok := gprocs.Load("mysql"); !ok { t.Errorf("want %s, got none", "mysql") } } diff --git a/metric/metric.go b/metric/metric.go index 1124488..2524f72 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -40,3 +40,12 @@ func AvairableMetrics() map[MetricKey]Metric { return metrics } + +func DefaultEnabledMetrics() map[MetricKey]bool { + enabled := make(map[MetricKey]bool) + for _, k := range MetricKeys { + enabled[k] = false + } + enabled[ProcProcs] = true + return enabled +}