Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common exec error handling #2501

Merged
merged 5 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions pkg/kube/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kube
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"strings"
Expand All @@ -30,6 +31,40 @@ import (
"k8s.io/client-go/tools/remotecommand"
)

// ExecError is an error returned by kube.Exec, kube.ExecOutput and kube.ExecWithOptions.
// It contains not only error happened during an execution, but also keeps tails of stdout/stderr streams.
// These tails could be used by the invoker to construct more precise error.
type ExecError struct {
error
stdout LogTail
stderr LogTail
}

// NewExecError creates an instance of ExecError
func NewExecError(err error, stdout, stderr LogTail) *ExecError {
return &ExecError{
error: err,
stdout: stdout,
stderr: stderr,
}
}

func (e *ExecError) Error() string {
return fmt.Sprintf("%s.\nstdout: %s\nstderr: %s", e.error.Error(), e.Stdout(), e.Stderr())
PrasadG193 marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *ExecError) Unwrap() error {
return e.error
}

func (e *ExecError) Stdout() string {
return e.stdout.ToString()
}

func (e *ExecError) Stderr() string {
return e.stderr.ToString()
}

// ExecOptions passed to ExecWithOptions
type ExecOptions struct {
Command []string
Expand Down Expand Up @@ -118,12 +153,25 @@ func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options
req.Param("container", options.ContainerName)
}

stderrTail := NewLogTail(logTailDefaultLength)
stdoutTail := NewLogTail(logTailDefaultLength)

var stdout io.Writer = stdoutTail
if options.Stdout != nil {
stdout = io.MultiWriter(options.Stdout, stdoutTail)
}

var stderr io.Writer = stderrTail
if options.Stderr != nil {
stderr = io.MultiWriter(options.Stderr, stderrTail)
}

req.VersionedParams(&v1.PodExecOptions{
Container: options.ContainerName,
Command: options.Command,
Stdin: options.Stdin != nil,
Stdout: options.Stdout != nil,
Stderr: options.Stderr != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
TTY: tty,
}, scheme.ParameterCodec)

Expand All @@ -134,9 +182,14 @@ func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options
req.URL(),
config,
options.Stdin,
options.Stdout,
options.Stderr,
stdout,
stderr,
tty)

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that necessary here?
It seems like this will change the behaviour for ExecWithOptions so it always returns error, should it be left in pod_command_executor?
Am I missing something here?

Copy link
Contributor Author

@e-sumin e-sumin Dec 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old code was returning error (or nil) as is, new code returns nil as is, and wraps an error with ExecError (which contains stderr/stdout tails), what is actually was the purpose of this PR.

Because without that change, only PodCommandExecutor users were able to find/parse an error in logs, and we want to give this possibility for users of kube.Exec/kube.ExecWithOptions/kube.ExecOutput too.

err = NewExecError(err, stdoutTail, stderrTail)
}

errCh <- err
}()

Expand Down
79 changes: 79 additions & 0 deletions pkg/kube/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kube
import (
"bytes"
"context"
"errors"
"strings"
"time"

Expand Down Expand Up @@ -145,6 +146,84 @@ func (s *ExecSuite) TestExecWithWriterOptions(c *C) {
}
}

func (s *ExecSuite) TestErrorInExecWithOptions(c *C) {
c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning)
c.Assert(len(s.pod.Status.ContainerStatuses) > 0, Equals, true)

var testCases = []struct {
cmd []string
expectedOut []string
expectedErr []string
expectedText string
}{
{
cmd: []string{"sh", "-c", "printf 'test\ntest1\ntest2\ntest3\ntest4\ntest5\ntest6\ntest7\ntest8\ntest9\ntest10' && exit 1"},
expectedOut: []string{"test", "test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"},
expectedErr: []string{},
expectedText: "command terminated with exit code 1.\nstdout: test1\r\ntest2\r\ntest3\r\ntest4\r\ntest5\r\ntest6\r\ntest7\r\ntest8\r\ntest9\r\ntest10\nstderr: ",
},
{
cmd: []string{"sh", "-c", "printf 'test\ntest1\ntest2\ntest3\ntest4\ntest5\ntest6\ntest7\ntest8\ntest9\ntest10' >&2 && exit 1"},
expectedOut: []string{},
expectedErr: []string{"test", "test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"},
expectedText: "command terminated with exit code 1.\nstdout: \nstderr: test1\r\ntest2\r\ntest3\r\ntest4\r\ntest5\r\ntest6\r\ntest7\r\ntest8\r\ntest9\r\ntest10",
},
}

getSliceTail := func(slice []string, length int) []string {
if len(slice) > length {
return slice[len(slice)-length:]
}

return slice
}

for _, testCase := range testCases {
// First invocation is without stdout and stderr buffers
opts := ExecOptions{
Command: testCase.cmd,
Namespace: s.pod.Namespace,
PodName: s.pod.Name,
ContainerName: "", // use default container
Stdin: nil,
}
_, _, err1 := ExecWithOptions(s.cli, opts) // Output is not needed
c.Assert(err1, Not(IsNil))

var ee1 *ExecError
ok := errors.As(err1, &ee1)
c.Assert(ok, Equals, true)
c.Assert(ee1.Stdout(), Not(Equals), testCase.expectedOut)
c.Assert(ee1.Stderr(), Not(Equals), testCase.expectedErr)
c.Assert(ee1.Error(), Equals, testCase.expectedText)

// Now try the same with passing buffers for stdout and stderr
// This should not affect returned error
bufout := bytes.Buffer{}
buferr := bytes.Buffer{}
opts.Stdout = &bufout
opts.Stderr = &buferr

_, _, err2 := ExecWithOptions(s.cli, opts) // Output is not needed
c.Assert(err2, Not(IsNil))

var ee2 *ExecError
ok = errors.As(err2, &ee2)
c.Assert(ok, Equals, true)

// When error happens, stdout/stderr buffers should contain all lines produced by an app
c.Assert(bufout.String(), Equals, strings.Join(testCase.expectedOut, "\n"))
c.Assert(buferr.String(), Equals, strings.Join(testCase.expectedErr, "\n"))

// When error happens, ExecError should contain only last ten lines of stdout/stderr
c.Assert(ee2.Stdout(), Equals, strings.Join(getSliceTail(testCase.expectedOut, logTailDefaultLength), "\r\n"))
c.Assert(ee2.Stderr(), Equals, strings.Join(getSliceTail(testCase.expectedErr, logTailDefaultLength), "\r\n"))

// When error happens, ExecError should include stdout/stderr into its text representation
c.Assert(ee2.Error(), Equals, testCase.expectedText)
}
}

func (s *ExecSuite) TestExecEcho(c *C) {
cmd := []string{"sh", "-c", "cat -"}
c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning)
Expand Down
54 changes: 4 additions & 50 deletions pkg/kube/pod_command_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,6 @@ import (
"k8s.io/client-go/kubernetes"
)

// ExecError is an error returned by PodCommandExecutor.Exec
// It contains not only error happened during an execution, but also keeps tails of stdout/stderr streams.
// These tails could be used by the invoker to construct more precise error.
type ExecError struct {
error
stdout LogTail
stderr LogTail
}

// NewExecError creates an instance of ExecError
func NewExecError(err error, stdout, stderr LogTail) *ExecError {
return &ExecError{
error: err,
stdout: stdout,
stderr: stderr,
}
}

func (e *ExecError) Unwrap() error {
return e.error
}

func (e *ExecError) Stdout() string {
return e.stdout.ToString()
}

func (e *ExecError) Stderr() string {
return e.stderr.ToString()
}

// PodCommandExecutor provides a way to execute a command within the pod.
// Is intended to be returned by PodController and works with pod controlled by it.
type PodCommandExecutor interface {
Expand All @@ -68,32 +38,23 @@ type podCommandExecutor struct {
}

// Exec runs the command and logs stdout and stderr.
// In case of execution error, ExecError will be returned
// In case of execution error, ExecError produced by ExecWithOptions will be returned
func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error {
var (
stderrTail = NewLogTail(logTailDefaultLength)
stdoutTail = NewLogTail(logTailDefaultLength)
opts = ExecOptions{
opts = ExecOptions{
Command: command,
Namespace: p.namespace,
PodName: p.podName,
ContainerName: p.containerName,
Stdin: stdin,
Stdout: stdoutTail,
Stderr: stderrTail,
Stdout: stdout,
Stderr: stderr,
}

cmdDone = make(chan struct{})
err error
)

if stdout != nil {
opts.Stdout = io.MultiWriter(stdout, stdoutTail)
}
if stderr != nil {
opts.Stderr = io.MultiWriter(stderr, stderrTail)
}

go func() {
_, _, err = p.pcep.ExecWithOptions(opts)
close(cmdDone)
Expand All @@ -103,13 +64,6 @@ func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin i
case <-ctx.Done():
err = ctx.Err()
case <-cmdDone:
if err != nil {
err = &ExecError{
error: err,
stdout: stdoutTail,
stderr: stderrTail,
}
}
}

return err
Expand Down
45 changes: 0 additions & 45 deletions pkg/kube/pod_command_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ package kube
import (
"bytes"
"context"
"fmt"
"os"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -183,49 +181,6 @@ func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) {
c.Assert(bStdout.String(), Equals, expStdout)
c.Assert(bStderr.String(), Equals, expStderr)
},
"In case of failure, we have tail of logs": func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor) {
var errorLines []string
var outputLines []string
for i := 1; i <= 12; i++ {
errorLines = append(errorLines, fmt.Sprintf("error line %d", i))
outputLines = append(outputLines, fmt.Sprintf("output line %d", i))
}

var err error
prp.execWithOptionsStdout = strings.Join(outputLines, "\n")
prp.execWithOptionsStderr = strings.Join(errorLines, "\n")
prp.execWithOptionsErr = errors.New("SimulatedError")

expStdout := prp.execWithOptionsStdout
expStderr := prp.execWithOptionsStderr
expErrorStderr := strings.Join(errorLines[2:], "\r\n")
expErrorStdout := strings.Join(outputLines[2:], "\r\n")

var bStdin, bStdout, bStderr bytes.Buffer
var wg sync.WaitGroup
wg.Add(1)
go func() {
err = pr.Exec(ctx, command, &bStdin, &bStdout, &bStderr)
wg.Done()
}()
prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called
wg.Wait()
prp.execWithOptionsSyncEnd.Sync() // Release ExecWithOptions

c.Assert(err, Not(IsNil))
c.Assert(prp.inExecWithOptionsOpts.Stdout, Not(IsNil))
c.Assert(prp.inExecWithOptionsOpts.Stderr, Not(IsNil))
c.Assert(bStdout.Len() > 0, Equals, true)
c.Assert(bStderr.Len() > 0, Equals, true)
c.Assert(bStdout.String(), Equals, expStdout)
c.Assert(bStderr.String(), Equals, expStderr)

var ee *ExecError
c.Assert(errors.As(err, &ee), Equals, true)
c.Assert(ee.Error(), Equals, "SimulatedError")
c.Assert(ee.Stderr(), Equals, expErrorStderr)
c.Assert(ee.Stdout(), Equals, expErrorStdout)
},
}

for l, tc := range cases {
Expand Down