Skip to content

Commit

Permalink
Merge pull request #6 from k1LoW/concurrent-map
Browse files Browse the repository at this point in the history
Fix concurrent map (map[string]*grouped_proc.GroupedProc) writes
  • Loading branch information
k1LoW authored Sep 4, 2019
2 parents 18ff9bf + af7d001 commit 51057ca
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 54 deletions.
17 changes: 9 additions & 8 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type GroupedProcCollector struct {
GroupedProcs map[string]*grouped_proc.GroupedProc
GroupedProcs *grouped_proc.GroupedProcs
Metrics map[metric.MetricKey]metric.Metric
Enabled map[metric.MetricKey]bool
Grouper grouper.Grouper
Expand All @@ -30,24 +30,25 @@ func (c *GroupedProcCollector) Describe(ch chan<- *prometheus.Desc) {

func (c *GroupedProcCollector) Collect(ch chan<- prometheus.Metric) {
_ = c.Grouper.Collect(c.GroupedProcs, c.Enabled)
for group, proc := range c.GroupedProcs {
c.GroupedProcs.Range(func(group string, proc *grouped_proc.GroupedProc) bool {
log.Debugf("Collect grouped process: %s: %#v\n", group, proc)
if !proc.Exists {
delete(c.GroupedProcs, group)
c.GroupedProcs.Delete(group)
log.Debugf("Delete grouped process: %s\n", group)
continue
return true
}
for key, metric := range proc.Metrics {
if proc.Enabled[key] {
err := metric.PushCollected(ch, c.descs, c.Grouper.Name(), group)
if err != nil {
// TODO: metric.PushDefaultMetric(ch, c.descs, c.Grouper.Name(), group)
continue
return true
}
}
}
proc.Exists = false
}
return true
})
}

func (c *GroupedProcCollector) EnableMetric(metric metric.MetricKey) {
Expand All @@ -61,9 +62,9 @@ func (c *GroupedProcCollector) DisableMetric(metric metric.MetricKey) {
// NewGroupedProcCollector
func NewGroupedProcCollector(g grouper.Grouper) (*GroupedProcCollector, error) {
return &GroupedProcCollector{
GroupedProcs: make(map[string]*grouped_proc.GroupedProc),
GroupedProcs: grouped_proc.NewGroupedProcs(),
Metrics: metric.AvairableMetrics(),
Enabled: grouped_proc.DefaultEnabledMetrics(),
Enabled: metric.DefaultEnabledMetrics(),
Grouper: g,
descs: make(map[string]*prometheus.Desc),
}, nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0 // indirect
golang.org/x/sys v0.0.0-20190902133755-9109b7679e13 // indirect
golang.org/x/sys v0.0.0-20190904005037-43c01164e931 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190902133755-9109b7679e13 h1:tdsQdquKbTNMsSZLqnLELJGzCANp9oXhu6zFBW6ODx4=
golang.org/x/sys v0.0.0-20190902133755-9109b7679e13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904005037-43c01164e931 h1:+WYfosiOJzB4BjsISl1Rv4ZLUy+VYcF+u+0Y9jcerv8=
golang.org/x/sys v0.0.0-20190904005037-43c01164e931/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
48 changes: 39 additions & 9 deletions grouped_proc/grouped_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ func NewGroupedProc(enabled map[metric.MetricKey]bool) *GroupedProc {
}
}

func DefaultEnabledMetrics() map[metric.MetricKey]bool {
enabled := make(map[metric.MetricKey]bool)
for _, k := range metric.MetricKeys {
enabled[k] = false
}
enabled[metric.ProcProcs] = true
return enabled
}

func (g *GroupedProc) AppendProcAndCollect(pid int) error {
fs, err := procfs.NewFS(g.ProcMountPoint)
if err != nil {
Expand All @@ -61,3 +52,42 @@ func (g *GroupedProc) AppendProcAndCollect(pid int) error {

return nil
}

type GroupedProcs struct {
sm sync.Map
}

func (m *GroupedProcs) Load(group string) (*GroupedProc, bool) {
gproc, ok := m.sm.Load(group)
if !ok {
return nil, false
}
return gproc.(*GroupedProc), true
}

func (m *GroupedProcs) Store(group string, gproc *GroupedProc) {
m.sm.Store(group, gproc)
}

func (m *GroupedProcs) Delete(group string) {
m.sm.Delete(group)
}

func (m *GroupedProcs) Range(f func(group string, gproc *GroupedProc) bool) {
m.sm.Range(func(key, value interface{}) bool {
return f(key.(string), value.(*GroupedProc))
})
}

func (m *GroupedProcs) Length() int {
l := 0
m.Range(func(group string, gproc *GroupedProc) bool {
l = l + 1
return true
})
return l
}

func NewGroupedProcs() *GroupedProcs {
return &GroupedProcs{}
}
15 changes: 10 additions & 5 deletions grouper/cgroup/cgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Cgroup) Name() string {
return "cgroup"
}

func (c *Cgroup) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map[metric.MetricKey]bool) error {
func (c *Cgroup) Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool) error {
wg := &sync.WaitGroup{}

for _, s := range Subsystems {
Expand All @@ -68,11 +68,16 @@ func (c *Cgroup) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map
_ = f.Close()
return nil
}
_, ok := gpMap[cPath]
var (
gproc *grouped_proc.GroupedProc
ok bool
)
gproc, ok = gprocs.Load(cPath)
if !ok {
gpMap[cPath] = grouped_proc.NewGroupedProc(enabled)
gproc = grouped_proc.NewGroupedProc(enabled)
gprocs.Store(cPath, gproc)
}
gpMap[cPath].Exists = true
gproc.Exists = true
reader := bufio.NewReaderSize(f, 1028)
for {
line, _, err := reader.ReadLine()
Expand All @@ -92,7 +97,7 @@ func (c *Cgroup) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map
go func(wg *sync.WaitGroup, pid int, g *grouped_proc.GroupedProc) {
_ = g.AppendProcAndCollect(pid)
wg.Done()
}(wg, pid, gpMap[cPath])
}(wg, pid, gproc)
}
}
return nil
Expand Down
22 changes: 11 additions & 11 deletions grouper/cgroup/cgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ const (

func TestCollect(t *testing.T) {
cgroup := testCgroup()
gpMap := make(map[string]*grouped_proc.GroupedProc)
gprocs := grouped_proc.NewGroupedProcs()
enabled := make(map[metric.MetricKey]bool)

enabled[metric.ProcIO] = true
enabled[metric.ProcStat] = true
err := cgroup.Collect(gpMap, enabled)
err := cgroup.Collect(gprocs, enabled)
if err != nil {
t.Fatalf("%v", err)
}

if len(gpMap) != 2 {
t.Errorf("want %d, got %d", 2, len(gpMap))
if gprocs.Length() != 2 {
t.Errorf("want %d, got %d", 2, gprocs.Length())
}
if _, ok := gpMap["/system.slice/nginx.service"]; !ok {
if _, ok := gprocs.Load("/system.slice/nginx.service"); !ok {
t.Errorf("want %s, got none", "/system.slice/nginx.service")
}
if _, ok := gpMap["/system.slice/mysql.service"]; !ok {
if _, ok := gprocs.Load("/system.slice/mysql.service"); !ok {
t.Errorf("want %s, got none", "/system.slice/mysql.service")
}
}
Expand All @@ -42,20 +42,20 @@ func TestCollectWithNormalize(t *testing.T) {
if err != nil {
t.Fatalf("%v", err)
}
gpMap := make(map[string]*grouped_proc.GroupedProc)
gprocs := grouped_proc.NewGroupedProcs()
enabled := make(map[metric.MetricKey]bool)

enabled[metric.ProcIO] = true
enabled[metric.ProcStat] = true
err = cgroup.Collect(gpMap, enabled)
err = cgroup.Collect(gprocs, enabled)
if err != nil {
t.Fatalf("%v", err)
}

if len(gpMap) != 1 {
t.Errorf("want %d, got %d", 1, len(gpMap))
if gprocs.Length() != 1 {
t.Errorf("want %d, got %d", 1, gprocs.Length())
}
if _, ok := gpMap["system.slice"]; !ok {
if _, ok := gprocs.Load("system.slice"); !ok {
t.Errorf("want %s, got none", "system.slice")
}
}
Expand Down
2 changes: 1 addition & 1 deletion grouper/grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
type Grouper interface {
Name() string
SetNormalizeRegexp(nReStr string) error
Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map[metric.MetricKey]bool) error
Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool) error
}
15 changes: 10 additions & 5 deletions grouper/proc_status_name/proc_status_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (g *ProcStatusName) Name() string {
return "proc_status_name"
}

func (g *ProcStatusName) Collect(gpMap map[string]*grouped_proc.GroupedProc, enabled map[metric.MetricKey]bool) error {
func (g *ProcStatusName) Collect(gprocs *grouped_proc.GroupedProcs, enabled map[metric.MetricKey]bool) error {
wg := &sync.WaitGroup{}
fs, err := procfs.NewFS(g.procMountPoint)
if err != nil {
Expand Down Expand Up @@ -59,16 +59,21 @@ func (g *ProcStatusName) Collect(gpMap map[string]*grouped_proc.GroupedProc, ena
name = matches[1]
}
}
_, ok := gpMap[name]
var (
gproc *grouped_proc.GroupedProc
ok bool
)
gproc, ok = gprocs.Load(name)
if !ok {
gpMap[name] = grouped_proc.NewGroupedProc(enabled)
gproc = grouped_proc.NewGroupedProc(enabled)
gprocs.Store(name, gproc)
}
gpMap[name].Exists = true
gproc.Exists = true
wg.Add(1)
go func(wg *sync.WaitGroup, pid int, g *grouped_proc.GroupedProc) {
_ = g.AppendProcAndCollect(pid)
wg.Done()
}(wg, pid, gpMap[name])
}(wg, pid, gproc)
}
wg.Wait()
return nil
Expand Down
24 changes: 12 additions & 12 deletions grouper/proc_status_name/proc_status_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ const (

func TestCollect(t *testing.T) {
procStatusName := testProcStatusName()
gpMap := make(map[string]*grouped_proc.GroupedProc)
gprocs := grouped_proc.NewGroupedProcs()
enabled := make(map[metric.MetricKey]bool)

enabled[metric.ProcIO] = true
enabled[metric.ProcStat] = true
err := procStatusName.Collect(gpMap, enabled)
err := procStatusName.Collect(gprocs, enabled)
if err != nil {
t.Fatalf("%v", err)
}

if len(gpMap) != 2 {
t.Errorf("want %d, got %d", 2, len(gpMap))
if gprocs.Length() != 2 {
t.Errorf("want %d, got %d", 2, gprocs.Length())
}
if _, ok := gpMap["nginx"]; !ok {
if _, ok := gprocs.Load("nginx"); !ok {
t.Errorf("want %s, got none", "nginx")
}
if _, ok := gpMap["mysqld"]; !ok {
if _, ok := gprocs.Load("mysqld"); !ok {
t.Errorf("want %s, got none", "mysqld")
}
}
Expand All @@ -41,23 +41,23 @@ func TestCollectWithNormalize(t *testing.T) {
if err != nil {
t.Fatalf("%v", err)
}
gpMap := make(map[string]*grouped_proc.GroupedProc)
gprocs := grouped_proc.NewGroupedProcs()
enabled := make(map[metric.MetricKey]bool)

enabled[metric.ProcIO] = true
enabled[metric.ProcStat] = true
err = procStatusName.Collect(gpMap, enabled)
err = procStatusName.Collect(gprocs, enabled)
if err != nil {
t.Fatalf("%v", err)
}

if len(gpMap) != 2 {
t.Errorf("want %d, got %d", 2, len(gpMap))
if gprocs.Length() != 2 {
t.Errorf("want %d, got %d", 2, gprocs.Length())
}
if _, ok := gpMap["nginx"]; !ok {
if _, ok := gprocs.Load("nginx"); !ok {
t.Errorf("want %s, got none", "nginx")
}
if _, ok := gpMap["mysql"]; !ok {
if _, ok := gprocs.Load("mysql"); !ok {
t.Errorf("want %s, got none", "mysql")
}
}
Expand Down
9 changes: 9 additions & 0 deletions metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ func AvairableMetrics() map[MetricKey]Metric {

return metrics
}

func DefaultEnabledMetrics() map[MetricKey]bool {
enabled := make(map[MetricKey]bool)
for _, k := range MetricKeys {
enabled[k] = false
}
enabled[ProcProcs] = true
return enabled
}

0 comments on commit 51057ca

Please sign in to comment.