Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inputs.phpfpm): Continue despite erroneous sockets #14852

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions plugins/inputs/phpfpm/phpfpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,28 @@ type JSONMetrics struct {
} `json:"processes"`
}

type metric map[string]int64
type poolStat map[string]metric
type metricStat map[string]int64
type poolStat map[string]metricStat

type phpfpm struct {
Format string `toml:"format"`
Timeout config.Duration `toml:"timeout"`
Urls []string `toml:"urls"`

Log telegraf.Logger `toml:"-"`
tls.ClientConfig

client *http.Client
Log telegraf.Logger
}

func (*phpfpm) SampleConfig() string {
return sampleConfig
}

func (p *phpfpm) Init() error {
if len(p.Urls) == 0 {
p.Urls = []string{"http://127.0.0.1/status"}
}

tlsCfg, err := p.ClientConfig.TLSConfig()
if err != nil {
return err
Expand All @@ -117,18 +121,8 @@ func (p *phpfpm) Init() error {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *phpfpm) Gather(acc telegraf.Accumulator) error {
if len(p.Urls) == 0 {
return p.gatherServer("http://127.0.0.1/status", acc)
}

var wg sync.WaitGroup

urls, err := expandUrls(p.Urls)
if err != nil {
return err
}

for _, serv := range urls {
for _, serv := range expandUrls(acc, p.Urls) {
wg.Add(1)
go func(serv string) {
defer wg.Done()
Expand Down Expand Up @@ -259,7 +253,7 @@ func parseLines(r io.Reader, acc telegraf.Accumulator, addr string) {
// We start to gather data for a new pool here
if fieldName == PfPool {
currentPool = strings.Trim(keyvalue[1], " ")
stats[currentPool] = make(metric)
stats[currentPool] = make(metricStat)
continue
}

Expand Down Expand Up @@ -347,7 +341,7 @@ func (p *phpfpm) parseJSON(r io.Reader, acc telegraf.Accumulator, addr string) {
}
}

func expandUrls(urls []string) ([]string, error) {
func expandUrls(acc telegraf.Accumulator, urls []string) []string {
addrs := make([]string, 0, len(urls))
for _, address := range urls {
if isNetworkURL(address) {
Expand All @@ -356,11 +350,12 @@ func expandUrls(urls []string) ([]string, error) {
}
paths, err := globUnixSocket(address)
if err != nil {
return nil, err
acc.AddError(err)
continue
}
addrs = append(addrs, paths...)
}
return addrs, nil
return addrs
}

func globUnixSocket(address string) ([]string, error) {
Expand Down
89 changes: 64 additions & 25 deletions plugins/inputs/phpfpm/phpfpm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/common/shim"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
Expand Down Expand Up @@ -49,6 +52,7 @@ func TestPhpFpmGeneratesMetrics_From_Http(t *testing.T) {
url := ts.URL + "?test=ok"
r := &phpfpm{
Urls: []string{url},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())
Expand Down Expand Up @@ -96,7 +100,7 @@ func TestPhpFpmGeneratesJSONMetrics_From_Http(t *testing.T) {
input := &phpfpm{
Urls: []string{server.URL + "?full&json"},
Format: "json",
Log: testutil.Logger{},
Log: &testutil.Logger{},
}
require.NoError(t, input.Init())

Expand All @@ -117,8 +121,8 @@ func TestPhpFpmGeneratesMetrics_From_Fcgi(t *testing.T) {
//Now we tested again above server
r := &phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator
Expand Down Expand Up @@ -161,12 +165,11 @@ func TestPhpFpmGeneratesMetrics_From_Socket(t *testing.T) {

r := &phpfpm{
Urls: []string{tcp.Addr().String()},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

require.NoError(t, acc.GatherError(r.Gather))

tags := map[string]string{
Expand Down Expand Up @@ -214,14 +217,12 @@ func TestPhpFpmGeneratesMetrics_From_Multiple_Sockets_With_Glob(t *testing.T) {

r := &phpfpm{
Urls: []string{"/tmp/test-fpm[\\-0-9]*.sock"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc1, acc2 testutil.Accumulator

require.NoError(t, acc1.GatherError(r.Gather))

require.NoError(t, acc2.GatherError(r.Gather))

tags1 := map[string]string{
Expand Down Expand Up @@ -267,12 +268,11 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {

r := &phpfpm{
Urls: []string{tcp.Addr().String() + ":custom-status-path"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

require.NoError(t, acc.GatherError(r.Gather))

tags := map[string]string{
Expand Down Expand Up @@ -300,15 +300,14 @@ func TestPhpFpmGeneratesMetrics_From_Socket_Custom_Status_Path(t *testing.T) {
// When not passing server config, we default to localhost
// We just want to make sure we did request stat from localhost
func TestPhpFpmDefaultGetFromLocalhost(t *testing.T) {
r := &phpfpm{Urls: []string{"http://bad.localhost:62001/status"}}

r := &phpfpm{
Urls: []string{"http://bad.localhost:62001/status"},
Log: &testutil.Logger{},
}
require.NoError(t, r.Init())

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
require.Error(t, err)
require.Contains(t, err.Error(), "/status")
require.ErrorContains(t, acc.GatherError(r.Gather), "/status")
}

func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t *testing.T) {
Expand All @@ -318,30 +317,25 @@ func TestPhpFpmGeneratesMetrics_Throw_Error_When_Fpm_Status_Is_Not_Responding(t

r := &phpfpm{
Urls: []string{"http://aninvalidone"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
require.Error(t, err)
require.Contains(t, err.Error(), `unable to connect to phpfpm status page 'http://aninvalidone'`)
require.Contains(t, err.Error(), `lookup aninvalidone`)
require.ErrorContains(t, err, `unable to connect to phpfpm status page 'http://aninvalidone'`)
require.ErrorContains(t, err, `lookup aninvalidone`)
}

func TestPhpFpmGeneratesMetrics_Throw_Error_When_Socket_Path_Is_Invalid(t *testing.T) {
r := &phpfpm{
Urls: []string{"/tmp/invalid.sock"},
Log: &testutil.Logger{},
}

require.NoError(t, r.Init())

var acc testutil.Accumulator

err := acc.GatherError(r.Gather)
require.Error(t, err)
require.Equal(t, `socket doesn't exist "/tmp/invalid.sock"`, err.Error())
require.ErrorContains(t, acc.GatherError(r.Gather), `socket doesn't exist "/tmp/invalid.sock"`)
}

const outputSample = `
Expand Down Expand Up @@ -389,3 +383,48 @@ func TestPhpFpmParseJSON_Log_Error_Without_Panic_When_When_JSON_Is_Invalid(t *te
require.NotPanics(t, func() { p.parseJSON(bytes.NewReader(invalidJSON), &testutil.NopAccumulator{}, "") })
require.Contains(t, logOutput.String(), "E! Unable to decode JSON response: invalid character 'X' looking for beginning of value")
}

func TestGatherDespiteUnavailable(t *testing.T) {
// Let OS find an available port
tcp, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "Cannot initialize test server")
defer tcp.Close()

s := statServer{}
go fcgi.Serve(tcp, s) //nolint:errcheck // ignore the returned error as we cannot do anything about it anyway

//Now we tested again above server
r := &phpfpm{
Urls: []string{"fcgi://" + tcp.Addr().String() + "/status", "/lala"},
Log: &testutil.Logger{},
}
require.NoError(t, r.Init())

expected := []telegraf.Metric{
metric.New(
"phpfpm",
map[string]string{
"pool": "www",
"url": r.Urls[0],
},
map[string]interface{}{
"start_since": int64(1991),
"accepted_conn": int64(3),
"listen_queue": int64(1),
"max_listen_queue": int64(0),
"listen_queue_len": int64(0),
"idle_processes": int64(1),
"active_processes": int64(1),
"total_processes": int64(2),
"max_active_processes": int64(1),
"max_children_reached": int64(2),
"slow_requests": int64(1),
},
time.Unix(0, 0),
),
}

var acc testutil.Accumulator
require.ErrorContains(t, acc.GatherError(r.Gather), "socket doesn't exist")
testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), testutil.IgnoreTime())
}
Loading