Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow globs in FPM unix socket paths #7089

Merged
merged 7 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugins/inputs/phpfpm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Get phpfpm stats using either HTTP status page or fpm socket.
## "/var/run/php5-fpm.sock"
## or using a custom fpm status path:
## "/var/run/php5-fpm.sock:fpm-custom-status-path"
## glob patterns are also supported:
## "/var/run/php*.sock"
##
## - fcgi: the URL must start with fcgi:// or cgi://, and port must be present, ie:
## "fcgi://10.0.0.12:9000/status"
Expand Down
84 changes: 73 additions & 11 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand Down Expand Up @@ -95,7 +96,12 @@ func (g *phpfpm) Gather(acc telegraf.Accumulator) error {

var wg sync.WaitGroup

for _, serv := range g.Urls {
urls, err := expandUrls(g.Urls)
if err != nil {
return err
}

for _, serv := range urls {
wg.Add(1)
go func(serv string) {
defer wg.Done()
Expand Down Expand Up @@ -153,18 +159,10 @@ func (g *phpfpm) gatherServer(addr string, acc telegraf.Accumulator) error {
statusPath = "status"
}
} else {
socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 {
socketPath = socketAddr[0]
statusPath = socketAddr[1]
} else {
socketPath = socketAddr[0]
socketPath, statusPath = unixSocketPaths(addr)
if statusPath == "" {
statusPath = "status"
}

if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return fmt.Errorf("Socket doesn't exist '%s': %s", socketPath, err)
}
fcgi, err = newFcgiClient("unix", socketPath)
}

Expand Down Expand Up @@ -277,6 +275,70 @@ func importMetric(r io.Reader, acc telegraf.Accumulator, addr string) (poolStat,
return stats, nil
}

func expandUrls(urls []string) ([]string, error) {
addrs := make([]string, 0, len(urls))
for _, url := range urls {
if isNetworkURL(url) {
addrs = append(addrs, url)
continue
}
paths, err := globUnixSocket(url)
if err != nil {
return nil, err
}
addrs = append(addrs, paths...)
}
return addrs, nil
}

func globUnixSocket(url string) ([]string, error) {
pattern, status := unixSocketPaths(url)
glob, err := globpath.Compile(pattern)
if err != nil {
return nil, fmt.Errorf("could not compile glob %q: %v", pattern, err)
}
paths := glob.Match()
if len(paths) == 0 {
if _, err := os.Stat(paths[0]); err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("Socket doesn't exist '%s': %s", pattern, err)
}
return nil, err
}
return nil, nil
}

addrs := make([]string, 0, len(paths))

for _, path := range paths {
if status != "" {
status = fmt.Sprintf(":%s", status)
}
addrs = append(addrs, fmt.Sprintf("%s%s", path, status))
}

return addrs, nil
}

func unixSocketPaths(addr string) (string, string) {
var socketPath, statusPath string

socketAddr := strings.Split(addr, ":")
if len(socketAddr) >= 2 {
socketPath = socketAddr[0]
statusPath = socketAddr[1]
} else {
socketPath = socketAddr[0]
statusPath = ""
}

return socketPath, statusPath
}

func isNetworkURL(addr string) bool {
return strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") || strings.HasPrefix(addr, "fcgi://") || strings.HasPrefix(addr, "cgi://")
}

func init() {
inputs.Add("phpfpm", func() telegraf.Input {
return &phpfpm{}
Expand Down
67 changes: 66 additions & 1 deletion plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,71 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {
acc.AssertContainsTaggedFields(t, "phpfpm", fields, tags)
}

func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {
// Create a socket in /tmp because we always have write permission and if the
// removing of socket fail when system restart /tmp is clear so
// we don't have junk files around
var randomNumber int64
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
socket1 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
tcp1, err := net.Listen("unix", socket1)
if err != nil {
t.Fatal("Cannot initialize server on port ")
}
defer tcp1.Close()

binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
socket2 := fmt.Sprintf("/tmp/test-fpm%d.sock", randomNumber)
tcp2, err := net.Listen("unix", socket2)
if err != nil {
t.Fatal("Cannot initialize server on port ")
}
defer tcp2.Close()

s := statServer{}
go fcgi.Serve(tcp1, s)
go fcgi.Serve(tcp2, s)

r := &phpfpm{
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
}

var acc1, acc2 testutil.Accumulator

err = acc1.GatherError(r.Gather)
require.NoError(t, err)

err = acc2.GatherError(r.Gather)
require.NoError(t, err)

tags1 := map[string]string{
"pool": "www",
"url": socket1,
}

tags2 := map[string]string{
"pool": "www",
"url": socket2,
}

fields := map[string]interface{}{
"start_since": int64(1991),
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
}

acc1.AssertContainsTaggedFields(t, "phpfpm", fields, tags1)
acc2.AssertContainsTaggedFields(t, "phpfpm", fields, tags2)
}

func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// Create a socket in /tmp because we always have write permission. If the
// removing of socket fail we won't have junk files around. Cuz when system
Expand Down Expand Up @@ -227,7 +292,7 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testi

err := acc.GatherError(r.Gather)
require.Error(t, err)
assert.Equal(t, `Socket doesn't exist '/tmp/invalid.sock': stat /tmp/invalid.sock: no such file or directory`, err.Error())
assert.Equal(t, `dial unix /tmp/invalid.sock: connect: no such file or directory`, err.Error())

}

Expand Down