Skip to content

Commit

Permalink
Implement timeouts for all exec command runners
Browse files Browse the repository at this point in the history
First is to write an internal CombinedOutput and Run function with a
timeout.

Second, the following instances of command runners need to have timeouts:

    plugins/inputs/ping/ping.go
    125:	out, err := c.CombinedOutput()

    plugins/inputs/exec/exec.go
    91:	if err := cmd.Run(); err != nil {

    plugins/inputs/ipmi_sensor/command.go
    31:	err := cmd.Run()

    plugins/inputs/sysstat/sysstat.go
    194:	out, err := cmd.CombinedOutput()

    plugins/inputs/leofs/leofs.go
    185:	defer cmd.Wait()

    plugins/inputs/sysstat/sysstat.go
    282:	if err := cmd.Wait(); err != nil {

closes #1067
  • Loading branch information
sparrc committed Apr 29, 2016
1 parent cbe32c7 commit 3f807a9
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 55 deletions.
56 changes: 54 additions & 2 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ package internal

import (
"bufio"
"bytes"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"strings"
"time"
"unicode"
)

const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

var (
TimeoutErr = errors.New("Command timed out.")

NotImplementedError = errors.New("not implemented yet")
)

// Duration just wraps time.Duration
type Duration struct {
Duration time.Duration
Expand All @@ -33,8 +42,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil
}

var NotImplementedError = errors.New("not implemented yet")

// ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) {
Expand Down Expand Up @@ -139,3 +146,48 @@ func SnakeCase(in string) string {

return string(out)
}

// CombinedOutputTimeout runs the given command with the given timeout and
// returns the combined output of stdout and stderr.
// If the command times out, it attempts to kill the process.
func CombinedOutputTimeout(c *exec.Cmd, timeout time.Duration) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
if err := c.Start(); err != nil {
return nil, err
}
err := WaitTimeout(c, timeout)
return b.Bytes(), err
}

// RunTimeout runs the given command with the given timeout.
// If the command times out, it attempts to kill the process.
func RunTimeout(c *exec.Cmd, timeout time.Duration) error {
if err := c.Start(); err != nil {
return err
}
return WaitTimeout(c, timeout)
}

// WaitTimeout waits for the given command to finish with a timeout.
// It assumes the command has already been started.
// If the command times out, it attempts to kill the process.
func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
timer := time.NewTimer(timeout)
done := make(chan error)
go func() { done <- c.Wait() }()
select {
case err := <-done:
timer.Stop()
return err
case <-timer.C:
if err := c.Process.Kill(); err != nil {
log.Printf("FATAL error killing process: %s", err)
return err
}
// wait for the command to return after killing it
<-done
return TimeoutErr
}
}
78 changes: 77 additions & 1 deletion internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package internal

import "testing"
import (
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type SnakeTest struct {
input string
Expand Down Expand Up @@ -30,3 +36,73 @@ func TestSnakeCase(t *testing.T) {
}
}
}

var (
sleepbin, _ = exec.LookPath("sleep")
echobin, _ = exec.LookPath("echo")
)

func TestRunTimeout(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "10")
start := time.Now()
err := RunTimeout(cmd, time.Millisecond*20)
elapsed := time.Since(start)

assert.Equal(t, TimeoutErr, err)
// Verify that command gets killed in 20ms, with some breathing room
assert.True(t, elapsed < time.Millisecond*75)
}

func TestCombinedOutputTimeout(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "10")
start := time.Now()
_, err := CombinedOutputTimeout(cmd, time.Millisecond*20)
elapsed := time.Since(start)

assert.Equal(t, TimeoutErr, err)
// Verify that command gets killed in 20ms, with some breathing room
assert.True(t, elapsed < time.Millisecond*75)
}

func TestCombinedOutput(t *testing.T) {
if echobin == "" {
t.Skip("'echo' binary not available on OS, skipping.")
}
cmd := exec.Command(echobin, "foo")
out, err := CombinedOutputTimeout(cmd, time.Second)

assert.NoError(t, err)
assert.Equal(t, "foo\n", string(out))
}

// test that CombinedOutputTimeout and exec.Cmd.CombinedOutput return
// the same output from a failed command.
func TestCombinedOutputError(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "foo")
expected, err := cmd.CombinedOutput()

cmd2 := exec.Command(sleepbin, "foo")
actual, err := CombinedOutputTimeout(cmd2, time.Second)

assert.Error(t, err)
assert.Equal(t, expected, actual)
}

func TestRunError(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "foo")
err := RunTimeout(cmd, time.Second)

assert.Error(t, err)
}
17 changes: 14 additions & 3 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os/exec"
"sync"
"syscall"
"time"

"github.com/gonuts/go-shellquote"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
Expand All @@ -19,6 +21,9 @@ const sampleConfig = `
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
## Timeout for each command to complete.
timeout = "5s"
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
Expand All @@ -32,6 +37,7 @@ const sampleConfig = `
type Exec struct {
Commands []string
Command string
Timeout internal.Duration

parser parsers.Parser

Expand All @@ -43,7 +49,8 @@ type Exec struct {

func NewExec() *Exec {
return &Exec{
runner: CommandRunner{},
runner: CommandRunner{},
Timeout: internal.Duration{Duration: time.Second * 5},
}
}

Expand Down Expand Up @@ -73,7 +80,11 @@ func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
return nil
}

func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
func (c CommandRunner) Run(
e *Exec,
command string,
acc telegraf.Accumulator,
) ([]byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
Expand All @@ -84,7 +95,7 @@ func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([
var out bytes.Buffer
cmd.Stdout = &out

if err := cmd.Run(); err != nil {
if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
Expand Down
15 changes: 6 additions & 9 deletions plugins/inputs/ipmi_sensor/command.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package ipmi_sensor

import (
"bytes"
"fmt"
"os/exec"
"strings"
"time"

"github.com/influxdata/telegraf/internal"
)

type CommandRunner struct{}
Expand All @@ -18,21 +20,16 @@ func (t CommandRunner) cmd(conn *Connection, args ...string) *exec.Cmd {
}

return exec.Command(path, opts...)

}

func (t CommandRunner) Run(conn *Connection, args ...string) (string, error) {
cmd := t.cmd(conn, args...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err := cmd.Run()
output, err := internal.CombinedOutputTimeout(cmd, time.Second*5)
if err != nil {
return "", fmt.Errorf("run %s %s: %s (%s)",
cmd.Path, strings.Join(cmd.Args, " "), stderr.String(), err)
cmd.Path, strings.Join(cmd.Args, " "), string(output), err)
}

return stdout.String(), err
return string(output), err
}
15 changes: 11 additions & 4 deletions plugins/inputs/leofs/leofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package leofs
import (
"bufio"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net/url"
"os/exec"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

const oid = ".1.3.6.1.4.1.35450"
Expand Down Expand Up @@ -175,14 +178,18 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return outerr
}

func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc telegraf.Accumulator) error {
func (l *LeoFS) gatherServer(
endpoint string,
serverType ServerType,
acc telegraf.Accumulator,
) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
cmd.Start()
defer cmd.Wait()
defer internal.WaitTimeout(cmd, time.Second*5)
scanner := bufio.NewScanner(stdout)
if !scanner.Scan() {
return fmt.Errorf("Unable to retrieve the node name")
Expand Down
11 changes: 7 additions & 4 deletions plugins/inputs/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

// HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type HostPinger func(args ...string) (string, error)
type HostPinger func(timeout float64, args ...string) (string, error)

type Ping struct {
// Interval at which to ping (ping -i <INTERVAL>)
Expand Down Expand Up @@ -74,7 +76,7 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
go func(u string) {
defer wg.Done()
args := p.args(u)
out, err := p.pingHost(args...)
out, err := p.pingHost(p.Timeout, args...)
if err != nil {
// Combine go err + stderr output
errorChannel <- errors.New(
Expand Down Expand Up @@ -116,13 +118,14 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
return errors.New(strings.Join(errorStrings, "\n"))
}

func hostPinger(args ...string) (string, error) {
func hostPinger(timeout float64, args ...string) (string, error) {
bin, err := exec.LookPath("ping")
if err != nil {
return "", err
}
c := exec.Command(bin, args...)
out, err := c.CombinedOutput()
out, err := internal.CombinedOutputTimeout(c,
time.Second*time.Duration(timeout+1))
return string(out), err
}

Expand Down
8 changes: 4 additions & 4 deletions plugins/inputs/ping/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestArgs(t *testing.T) {
"Expected: %s Actual: %s", expected, actual)
}

func mockHostPinger(args ...string) (string, error) {
func mockHostPinger(timeout float64, args ...string) (string, error) {
return linuxPingOutput, nil
}

Expand Down Expand Up @@ -161,7 +161,7 @@ PING www.google.com (216.58.218.164) 56(84) bytes of data.
rtt min/avg/max/mdev = 35.225/44.033/51.806/5.325 ms
`

func mockLossyHostPinger(args ...string) (string, error) {
func mockLossyHostPinger(timeout float64, args ...string) (string, error) {
return lossyPingOutput, nil
}

Expand Down Expand Up @@ -192,7 +192,7 @@ Request timeout for icmp_seq 0
2 packets transmitted, 0 packets received, 100.0% packet loss
`

func mockErrorHostPinger(args ...string) (string, error) {
func mockErrorHostPinger(timeout float64, args ...string) (string, error) {
return errorPingOutput, errors.New("No packets received")
}

Expand All @@ -215,7 +215,7 @@ func TestBadPingGather(t *testing.T) {
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}

func mockFatalHostPinger(args ...string) (string, error) {
func mockFatalHostPinger(timeout float64, args ...string) (string, error) {
return fatalPingOutput, errors.New("So very bad")
}

Expand Down
Loading

0 comments on commit 3f807a9

Please sign in to comment.