Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement timeouts for all exec command runners #1125

Merged
merged 1 commit into from
Apr 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ package internal

import (
"bufio"
"bytes"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"strings"
"time"
"unicode"
)

const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

var (
TimeoutErr = errors.New("Command timed out.")

NotImplementedError = errors.New("not implemented yet")
)

// Duration just wraps time.Duration
type Duration struct {
Duration time.Duration
Expand All @@ -33,8 +42,6 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil
}

var NotImplementedError = errors.New("not implemented yet")

// ReadLines reads contents from a file and splits them by new lines.
// A convenience wrapper to ReadLinesOffsetN(filename, 0, -1).
func ReadLines(filename string) ([]string, error) {
Expand Down Expand Up @@ -139,3 +146,48 @@ func SnakeCase(in string) string {

return string(out)
}

// CombinedOutputTimeout runs the given command with the given timeout and
// returns the combined output of stdout and stderr.
// If the command times out, it attempts to kill the process.
func CombinedOutputTimeout(c *exec.Cmd, timeout time.Duration) ([]byte, error) {
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
if err := c.Start(); err != nil {
return nil, err
}
err := WaitTimeout(c, timeout)
return b.Bytes(), err
}

// RunTimeout runs the given command with the given timeout.
// If the command times out, it attempts to kill the process.
func RunTimeout(c *exec.Cmd, timeout time.Duration) error {
if err := c.Start(); err != nil {
return err
}
return WaitTimeout(c, timeout)
}

// WaitTimeout waits for the given command to finish with a timeout.
// It assumes the command has already been started.
// If the command times out, it attempts to kill the process.
func WaitTimeout(c *exec.Cmd, timeout time.Duration) error {
timer := time.NewTimer(timeout)
done := make(chan error)
go func() { done <- c.Wait() }()
select {
case err := <-done:
timer.Stop()
return err
case <-timer.C:
if err := c.Process.Kill(); err != nil {
log.Printf("FATAL error killing process: %s", err)
return err
}
// wait for the command to return after killing it
<-done
return TimeoutErr
}
}
78 changes: 77 additions & 1 deletion internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package internal

import "testing"
import (
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type SnakeTest struct {
input string
Expand Down Expand Up @@ -30,3 +36,73 @@ func TestSnakeCase(t *testing.T) {
}
}
}

var (
sleepbin, _ = exec.LookPath("sleep")
echobin, _ = exec.LookPath("echo")
)

func TestRunTimeout(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "10")
start := time.Now()
err := RunTimeout(cmd, time.Millisecond*20)
elapsed := time.Since(start)

assert.Equal(t, TimeoutErr, err)
// Verify that command gets killed in 20ms, with some breathing room
assert.True(t, elapsed < time.Millisecond*75)
}

func TestCombinedOutputTimeout(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "10")
start := time.Now()
_, err := CombinedOutputTimeout(cmd, time.Millisecond*20)
elapsed := time.Since(start)

assert.Equal(t, TimeoutErr, err)
// Verify that command gets killed in 20ms, with some breathing room
assert.True(t, elapsed < time.Millisecond*75)
}

func TestCombinedOutput(t *testing.T) {
if echobin == "" {
t.Skip("'echo' binary not available on OS, skipping.")
}
cmd := exec.Command(echobin, "foo")
out, err := CombinedOutputTimeout(cmd, time.Second)

assert.NoError(t, err)
assert.Equal(t, "foo\n", string(out))
}

// test that CombinedOutputTimeout and exec.Cmd.CombinedOutput return
// the same output from a failed command.
func TestCombinedOutputError(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "foo")
expected, err := cmd.CombinedOutput()

cmd2 := exec.Command(sleepbin, "foo")
actual, err := CombinedOutputTimeout(cmd2, time.Second)

assert.Error(t, err)
assert.Equal(t, expected, actual)
}

func TestRunError(t *testing.T) {
if sleepbin == "" {
t.Skip("'sleep' binary not available on OS, skipping.")
}
cmd := exec.Command(sleepbin, "foo")
err := RunTimeout(cmd, time.Second)

assert.Error(t, err)
}
17 changes: 14 additions & 3 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os/exec"
"sync"
"syscall"
"time"

"github.com/gonuts/go-shellquote"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
Expand All @@ -19,6 +21,9 @@ const sampleConfig = `
## Commands array
commands = ["/tmp/test.sh", "/usr/bin/mycollector --foo=bar"]
## Timeout for each command to complete.
timeout = "5s"
## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
Expand All @@ -32,6 +37,7 @@ const sampleConfig = `
type Exec struct {
Commands []string
Command string
Timeout internal.Duration

parser parsers.Parser

Expand All @@ -43,7 +49,8 @@ type Exec struct {

func NewExec() *Exec {
return &Exec{
runner: CommandRunner{},
runner: CommandRunner{},
Timeout: internal.Duration{Duration: time.Second * 5},
}
}

Expand Down Expand Up @@ -73,7 +80,11 @@ func AddNagiosState(exitCode error, acc telegraf.Accumulator) error {
return nil
}

func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([]byte, error) {
func (c CommandRunner) Run(
e *Exec,
command string,
acc telegraf.Accumulator,
) ([]byte, error) {
split_cmd, err := shellquote.Split(command)
if err != nil || len(split_cmd) == 0 {
return nil, fmt.Errorf("exec: unable to parse command, %s", err)
Expand All @@ -84,7 +95,7 @@ func (c CommandRunner) Run(e *Exec, command string, acc telegraf.Accumulator) ([
var out bytes.Buffer
cmd.Stdout = &out

if err := cmd.Run(); err != nil {
if err := internal.RunTimeout(cmd, e.Timeout.Duration); err != nil {
switch e.parser.(type) {
case *nagios.NagiosParser:
AddNagiosState(err, acc)
Expand Down
15 changes: 6 additions & 9 deletions plugins/inputs/ipmi_sensor/command.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package ipmi_sensor

import (
"bytes"
"fmt"
"os/exec"
"strings"
"time"

"github.com/influxdata/telegraf/internal"
)

type CommandRunner struct{}
Expand All @@ -18,21 +20,16 @@ func (t CommandRunner) cmd(conn *Connection, args ...string) *exec.Cmd {
}

return exec.Command(path, opts...)

}

func (t CommandRunner) Run(conn *Connection, args ...string) (string, error) {
cmd := t.cmd(conn, args...)
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err := cmd.Run()
output, err := internal.CombinedOutputTimeout(cmd, time.Second*5)
if err != nil {
return "", fmt.Errorf("run %s %s: %s (%s)",
cmd.Path, strings.Join(cmd.Args, " "), stderr.String(), err)
cmd.Path, strings.Join(cmd.Args, " "), string(output), err)
}

return stdout.String(), err
return string(output), err
}
15 changes: 11 additions & 4 deletions plugins/inputs/leofs/leofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package leofs
import (
"bufio"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"net/url"
"os/exec"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

const oid = ".1.3.6.1.4.1.35450"
Expand Down Expand Up @@ -175,14 +178,18 @@ func (l *LeoFS) Gather(acc telegraf.Accumulator) error {
return outerr
}

func (l *LeoFS) gatherServer(endpoint string, serverType ServerType, acc telegraf.Accumulator) error {
func (l *LeoFS) gatherServer(
endpoint string,
serverType ServerType,
acc telegraf.Accumulator,
) error {
cmd := exec.Command("snmpwalk", "-v2c", "-cpublic", endpoint, oid)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
cmd.Start()
defer cmd.Wait()
defer internal.WaitTimeout(cmd, time.Second*5)
scanner := bufio.NewScanner(stdout)
if !scanner.Scan() {
return fmt.Errorf("Unable to retrieve the node name")
Expand Down
11 changes: 7 additions & 4 deletions plugins/inputs/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

// HostPinger is a function that runs the "ping" function using a list of
// passed arguments. This can be easily switched with a mocked ping function
// for unit test purposes (see ping_test.go)
type HostPinger func(args ...string) (string, error)
type HostPinger func(timeout float64, args ...string) (string, error)

type Ping struct {
// Interval at which to ping (ping -i <INTERVAL>)
Expand Down Expand Up @@ -74,7 +76,7 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
go func(u string) {
defer wg.Done()
args := p.args(u)
out, err := p.pingHost(args...)
out, err := p.pingHost(p.Timeout, args...)
if err != nil {
// Combine go err + stderr output
errorChannel <- errors.New(
Expand Down Expand Up @@ -116,13 +118,14 @@ func (p *Ping) Gather(acc telegraf.Accumulator) error {
return errors.New(strings.Join(errorStrings, "\n"))
}

func hostPinger(args ...string) (string, error) {
func hostPinger(timeout float64, args ...string) (string, error) {
bin, err := exec.LookPath("ping")
if err != nil {
return "", err
}
c := exec.Command(bin, args...)
out, err := c.CombinedOutput()
out, err := internal.CombinedOutputTimeout(c,
time.Second*time.Duration(timeout+1))
return string(out), err
}

Expand Down
8 changes: 4 additions & 4 deletions plugins/inputs/ping/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestArgs(t *testing.T) {
"Expected: %s Actual: %s", expected, actual)
}

func mockHostPinger(args ...string) (string, error) {
func mockHostPinger(timeout float64, args ...string) (string, error) {
return linuxPingOutput, nil
}

Expand Down Expand Up @@ -161,7 +161,7 @@ PING www.google.com (216.58.218.164) 56(84) bytes of data.
rtt min/avg/max/mdev = 35.225/44.033/51.806/5.325 ms
`

func mockLossyHostPinger(args ...string) (string, error) {
func mockLossyHostPinger(timeout float64, args ...string) (string, error) {
return lossyPingOutput, nil
}

Expand Down Expand Up @@ -192,7 +192,7 @@ Request timeout for icmp_seq 0
2 packets transmitted, 0 packets received, 100.0% packet loss
`

func mockErrorHostPinger(args ...string) (string, error) {
func mockErrorHostPinger(timeout float64, args ...string) (string, error) {
return errorPingOutput, errors.New("No packets received")
}

Expand All @@ -215,7 +215,7 @@ func TestBadPingGather(t *testing.T) {
acc.AssertContainsTaggedFields(t, "ping", fields, tags)
}

func mockFatalHostPinger(args ...string) (string, error) {
func mockFatalHostPinger(timeout float64, args ...string) (string, error) {
return fatalPingOutput, errors.New("So very bad")
}

Expand Down
Loading