Skip to content

Commit

Permalink
Common exec error handling (#2501)
Browse files Browse the repository at this point in the history
* When error happens during `kube.Exec`, error should capture tail of stdout/stderr streams.

* Move `ExecError` definition from `PodCommandExecutor` to `kube.Exec`

* `PodCommandExecutor` should just pass an `ExecError` produced by `kube.ExecWithOptions`

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
e-sumin and mergify[bot] committed Dec 13, 2023
1 parent 255e5ed commit f435763
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 99 deletions.
61 changes: 57 additions & 4 deletions pkg/kube/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kube
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"strings"
Expand All @@ -30,6 +31,40 @@ import (
"k8s.io/client-go/tools/remotecommand"
)

// ExecError is an error returned by kube.Exec, kube.ExecOutput and kube.ExecWithOptions.
// It contains not only error happened during an execution, but also keeps tails of stdout/stderr streams.
// These tails could be used by the invoker to construct more precise error.
type ExecError struct {
error
stdout LogTail
stderr LogTail
}

// NewExecError creates an instance of ExecError
func NewExecError(err error, stdout, stderr LogTail) *ExecError {
return &ExecError{
error: err,
stdout: stdout,
stderr: stderr,
}
}

func (e *ExecError) Error() string {
return fmt.Sprintf("%s.\nstdout: %s\nstderr: %s", e.error.Error(), e.Stdout(), e.Stderr())
}

func (e *ExecError) Unwrap() error {
return e.error
}

func (e *ExecError) Stdout() string {
return e.stdout.ToString()
}

func (e *ExecError) Stderr() string {
return e.stderr.ToString()
}

// ExecOptions passed to ExecWithOptions
type ExecOptions struct {
Command []string
Expand Down Expand Up @@ -118,12 +153,25 @@ func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options
req.Param("container", options.ContainerName)
}

stderrTail := NewLogTail(logTailDefaultLength)
stdoutTail := NewLogTail(logTailDefaultLength)

var stdout io.Writer = stdoutTail
if options.Stdout != nil {
stdout = io.MultiWriter(options.Stdout, stdoutTail)
}

var stderr io.Writer = stderrTail
if options.Stderr != nil {
stderr = io.MultiWriter(options.Stderr, stderrTail)
}

req.VersionedParams(&v1.PodExecOptions{
Container: options.ContainerName,
Command: options.Command,
Stdin: options.Stdin != nil,
Stdout: options.Stdout != nil,
Stderr: options.Stderr != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
TTY: tty,
}, scheme.ParameterCodec)

Expand All @@ -134,9 +182,14 @@ func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options
req.URL(),
config,
options.Stdin,
options.Stdout,
options.Stderr,
stdout,
stderr,
tty)

if err != nil {
err = NewExecError(err, stdoutTail, stderrTail)
}

errCh <- err
}()

Expand Down
79 changes: 79 additions & 0 deletions pkg/kube/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kube
import (
"bytes"
"context"
"errors"
"strings"
"time"

Expand Down Expand Up @@ -145,6 +146,84 @@ func (s *ExecSuite) TestExecWithWriterOptions(c *C) {
}
}

func (s *ExecSuite) TestErrorInExecWithOptions(c *C) {
c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning)
c.Assert(len(s.pod.Status.ContainerStatuses) > 0, Equals, true)

var testCases = []struct {
cmd []string
expectedOut []string
expectedErr []string
expectedText string
}{
{
cmd: []string{"sh", "-c", "printf 'test\ntest1\ntest2\ntest3\ntest4\ntest5\ntest6\ntest7\ntest8\ntest9\ntest10' && exit 1"},
expectedOut: []string{"test", "test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"},
expectedErr: []string{},
expectedText: "command terminated with exit code 1.\nstdout: test1\r\ntest2\r\ntest3\r\ntest4\r\ntest5\r\ntest6\r\ntest7\r\ntest8\r\ntest9\r\ntest10\nstderr: ",
},
{
cmd: []string{"sh", "-c", "printf 'test\ntest1\ntest2\ntest3\ntest4\ntest5\ntest6\ntest7\ntest8\ntest9\ntest10' >&2 && exit 1"},
expectedOut: []string{},
expectedErr: []string{"test", "test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"},
expectedText: "command terminated with exit code 1.\nstdout: \nstderr: test1\r\ntest2\r\ntest3\r\ntest4\r\ntest5\r\ntest6\r\ntest7\r\ntest8\r\ntest9\r\ntest10",
},
}

getSliceTail := func(slice []string, length int) []string {
if len(slice) > length {
return slice[len(slice)-length:]
}

return slice
}

for _, testCase := range testCases {
// First invocation is without stdout and stderr buffers
opts := ExecOptions{
Command: testCase.cmd,
Namespace: s.pod.Namespace,
PodName: s.pod.Name,
ContainerName: "", // use default container
Stdin: nil,
}
_, _, err1 := ExecWithOptions(s.cli, opts) // Output is not needed
c.Assert(err1, Not(IsNil))

var ee1 *ExecError
ok := errors.As(err1, &ee1)
c.Assert(ok, Equals, true)
c.Assert(ee1.Stdout(), Not(Equals), testCase.expectedOut)
c.Assert(ee1.Stderr(), Not(Equals), testCase.expectedErr)
c.Assert(ee1.Error(), Equals, testCase.expectedText)

// Now try the same with passing buffers for stdout and stderr
// This should not affect returned error
bufout := bytes.Buffer{}
buferr := bytes.Buffer{}
opts.Stdout = &bufout
opts.Stderr = &buferr

_, _, err2 := ExecWithOptions(s.cli, opts) // Output is not needed
c.Assert(err2, Not(IsNil))

var ee2 *ExecError
ok = errors.As(err2, &ee2)
c.Assert(ok, Equals, true)

// When error happens, stdout/stderr buffers should contain all lines produced by an app
c.Assert(bufout.String(), Equals, strings.Join(testCase.expectedOut, "\n"))
c.Assert(buferr.String(), Equals, strings.Join(testCase.expectedErr, "\n"))

// When error happens, ExecError should contain only last ten lines of stdout/stderr
c.Assert(ee2.Stdout(), Equals, strings.Join(getSliceTail(testCase.expectedOut, logTailDefaultLength), "\r\n"))
c.Assert(ee2.Stderr(), Equals, strings.Join(getSliceTail(testCase.expectedErr, logTailDefaultLength), "\r\n"))

// When error happens, ExecError should include stdout/stderr into its text representation
c.Assert(ee2.Error(), Equals, testCase.expectedText)
}
}

func (s *ExecSuite) TestExecEcho(c *C) {
cmd := []string{"sh", "-c", "cat -"}
c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning)
Expand Down
54 changes: 4 additions & 50 deletions pkg/kube/pod_command_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,6 @@ import (
"k8s.io/client-go/kubernetes"
)

// ExecError is an error returned by PodCommandExecutor.Exec
// It contains not only error happened during an execution, but also keeps tails of stdout/stderr streams.
// These tails could be used by the invoker to construct more precise error.
type ExecError struct {
error
stdout LogTail
stderr LogTail
}

// NewExecError creates an instance of ExecError
func NewExecError(err error, stdout, stderr LogTail) *ExecError {
return &ExecError{
error: err,
stdout: stdout,
stderr: stderr,
}
}

func (e *ExecError) Unwrap() error {
return e.error
}

func (e *ExecError) Stdout() string {
return e.stdout.ToString()
}

func (e *ExecError) Stderr() string {
return e.stderr.ToString()
}

// PodCommandExecutor provides a way to execute a command within the pod.
// Is intended to be returned by PodController and works with pod controlled by it.
type PodCommandExecutor interface {
Expand All @@ -68,32 +38,23 @@ type podCommandExecutor struct {
}

// Exec runs the command and logs stdout and stderr.
// In case of execution error, ExecError will be returned
// In case of execution error, ExecError produced by ExecWithOptions will be returned
func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error {
var (
stderrTail = NewLogTail(logTailDefaultLength)
stdoutTail = NewLogTail(logTailDefaultLength)
opts = ExecOptions{
opts = ExecOptions{
Command: command,
Namespace: p.namespace,
PodName: p.podName,
ContainerName: p.containerName,
Stdin: stdin,
Stdout: stdoutTail,
Stderr: stderrTail,
Stdout: stdout,
Stderr: stderr,
}

cmdDone = make(chan struct{})
err error
)

if stdout != nil {
opts.Stdout = io.MultiWriter(stdout, stdoutTail)
}
if stderr != nil {
opts.Stderr = io.MultiWriter(stderr, stderrTail)
}

go func() {
_, _, err = p.pcep.ExecWithOptions(opts)
close(cmdDone)
Expand All @@ -103,13 +64,6 @@ func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin i
case <-ctx.Done():
err = ctx.Err()
case <-cmdDone:
if err != nil {
err = &ExecError{
error: err,
stdout: stdoutTail,
stderr: stderrTail,
}
}
}

return err
Expand Down
45 changes: 0 additions & 45 deletions pkg/kube/pod_command_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ package kube
import (
"bytes"
"context"
"fmt"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -183,49 +181,6 @@ func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) {
c.Assert(bStdout.String(), Equals, expStdout)
c.Assert(bStderr.String(), Equals, expStderr)
},
"In case of failure, we have tail of logs": func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor) {
var errorLines []string
var outputLines []string
for i := 1; i <= 12; i++ {
errorLines = append(errorLines, fmt.Sprintf("error line %d", i))
outputLines = append(outputLines, fmt.Sprintf("output line %d", i))
}

var err error
prp.execWithOptionsStdout = strings.Join(outputLines, "\n")
prp.execWithOptionsStderr = strings.Join(errorLines, "\n")
prp.execWithOptionsErr = errors.New("SimulatedError")

expStdout := prp.execWithOptionsStdout
expStderr := prp.execWithOptionsStderr
expErrorStderr := strings.Join(errorLines[2:], "\r\n")
expErrorStdout := strings.Join(outputLines[2:], "\r\n")

var bStdin, bStdout, bStderr bytes.Buffer
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = pr.Exec(ctx, command, &bStdin, &bStdout, &bStderr)
wg.Done()
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release ExecWithOptions

c.Assert(err, Not(IsNil))
c.Assert(prp.inExecWithOptionsOpts.Stdout, Not(IsNil))
c.Assert(prp.inExecWithOptionsOpts.Stderr, Not(IsNil))
c.Assert(bStdout.Len() > 0, Equals, true)
c.Assert(bStderr.Len() > 0, Equals, true)
c.Assert(bStdout.String(), Equals, expStdout)
c.Assert(bStderr.String(), Equals, expStderr)

var ee *ExecError
c.Assert(errors.As(err, &ee), Equals, true)
c.Assert(ee.Error(), Equals, "SimulatedError")
c.Assert(ee.Stderr(), Equals, expErrorStderr)
c.Assert(ee.Stdout(), Equals, expErrorStdout)
},
}

for l, tc := range cases {
Expand Down

0 comments on commit f435763

Please sign in to comment.