From f435763556b5d578bdc1dcf35df40b27e01e0f8f Mon Sep 17 00:00:00 2001 From: Eugene Sumin <95425330+e-sumin@users.noreply.github.com> Date: Wed, 13 Dec 2023 12:24:00 +0100 Subject: [PATCH] Common exec error handling (#2501) * When error happens during `kube.Exec`, error should capture tail of stdout/stderr streams. * Move `ExecError` definition from `PodCommandExecutor` to `kube.Exec` * `PodCommandExecutor` should just pass an `ExecError` produced by `kube.ExecWithOptions` --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pkg/kube/exec.go | 61 +++++++++++++++++++-- pkg/kube/exec_test.go | 79 +++++++++++++++++++++++++++ pkg/kube/pod_command_executor.go | 54 ++---------------- pkg/kube/pod_command_executor_test.go | 45 --------------- 4 files changed, 140 insertions(+), 99 deletions(-) diff --git a/pkg/kube/exec.go b/pkg/kube/exec.go index ca47673c76..fc67002c22 100644 --- a/pkg/kube/exec.go +++ b/pkg/kube/exec.go @@ -17,6 +17,7 @@ package kube import ( "bytes" "context" + "fmt" "io" "net/url" "strings" @@ -30,6 +31,40 @@ import ( "k8s.io/client-go/tools/remotecommand" ) +// ExecError is an error returned by kube.Exec, kube.ExecOutput and kube.ExecWithOptions. +// It contains not only error happened during an execution, but also keeps tails of stdout/stderr streams. +// These tails could be used by the invoker to construct more precise error. +type ExecError struct { + error + stdout LogTail + stderr LogTail +} + +// NewExecError creates an instance of ExecError +func NewExecError(err error, stdout, stderr LogTail) *ExecError { + return &ExecError{ + error: err, + stdout: stdout, + stderr: stderr, + } +} + +func (e *ExecError) Error() string { + return fmt.Sprintf("%s.\nstdout: %s\nstderr: %s", e.error.Error(), e.Stdout(), e.Stderr()) +} + +func (e *ExecError) Unwrap() error { + return e.error +} + +func (e *ExecError) Stdout() string { + return e.stdout.ToString() +} + +func (e *ExecError) Stderr() string { + return e.stderr.ToString() +} + // ExecOptions passed to ExecWithOptions type ExecOptions struct { Command []string @@ -118,12 +153,25 @@ func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options req.Param("container", options.ContainerName) } + stderrTail := NewLogTail(logTailDefaultLength) + stdoutTail := NewLogTail(logTailDefaultLength) + + var stdout io.Writer = stdoutTail + if options.Stdout != nil { + stdout = io.MultiWriter(options.Stdout, stdoutTail) + } + + var stderr io.Writer = stderrTail + if options.Stderr != nil { + stderr = io.MultiWriter(options.Stderr, stderrTail) + } + req.VersionedParams(&v1.PodExecOptions{ Container: options.ContainerName, Command: options.Command, Stdin: options.Stdin != nil, - Stdout: options.Stdout != nil, - Stderr: options.Stderr != nil, + Stdout: stdout != nil, + Stderr: stderr != nil, TTY: tty, }, scheme.ParameterCodec) @@ -134,9 +182,14 @@ func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options req.URL(), config, options.Stdin, - options.Stdout, - options.Stderr, + stdout, + stderr, tty) + + if err != nil { + err = NewExecError(err, stdoutTail, stderrTail) + } + errCh <- err }() diff --git a/pkg/kube/exec_test.go b/pkg/kube/exec_test.go index daa6de3285..da11531a91 100644 --- a/pkg/kube/exec_test.go +++ b/pkg/kube/exec_test.go @@ -20,6 +20,7 @@ package kube import ( "bytes" "context" + "errors" "strings" "time" @@ -145,6 +146,84 @@ func (s *ExecSuite) TestExecWithWriterOptions(c *C) { } } +func (s *ExecSuite) TestErrorInExecWithOptions(c *C) { + c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning) + c.Assert(len(s.pod.Status.ContainerStatuses) > 0, Equals, true) + + var testCases = []struct { + cmd []string + expectedOut []string + expectedErr []string + expectedText string + }{ + { + cmd: []string{"sh", "-c", "printf 'test\ntest1\ntest2\ntest3\ntest4\ntest5\ntest6\ntest7\ntest8\ntest9\ntest10' && exit 1"}, + expectedOut: []string{"test", "test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"}, + expectedErr: []string{}, + expectedText: "command terminated with exit code 1.\nstdout: test1\r\ntest2\r\ntest3\r\ntest4\r\ntest5\r\ntest6\r\ntest7\r\ntest8\r\ntest9\r\ntest10\nstderr: ", + }, + { + cmd: []string{"sh", "-c", "printf 'test\ntest1\ntest2\ntest3\ntest4\ntest5\ntest6\ntest7\ntest8\ntest9\ntest10' >&2 && exit 1"}, + expectedOut: []string{}, + expectedErr: []string{"test", "test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10"}, + expectedText: "command terminated with exit code 1.\nstdout: \nstderr: test1\r\ntest2\r\ntest3\r\ntest4\r\ntest5\r\ntest6\r\ntest7\r\ntest8\r\ntest9\r\ntest10", + }, + } + + getSliceTail := func(slice []string, length int) []string { + if len(slice) > length { + return slice[len(slice)-length:] + } + + return slice + } + + for _, testCase := range testCases { + // First invocation is without stdout and stderr buffers + opts := ExecOptions{ + Command: testCase.cmd, + Namespace: s.pod.Namespace, + PodName: s.pod.Name, + ContainerName: "", // use default container + Stdin: nil, + } + _, _, err1 := ExecWithOptions(s.cli, opts) // Output is not needed + c.Assert(err1, Not(IsNil)) + + var ee1 *ExecError + ok := errors.As(err1, &ee1) + c.Assert(ok, Equals, true) + c.Assert(ee1.Stdout(), Not(Equals), testCase.expectedOut) + c.Assert(ee1.Stderr(), Not(Equals), testCase.expectedErr) + c.Assert(ee1.Error(), Equals, testCase.expectedText) + + // Now try the same with passing buffers for stdout and stderr + // This should not affect returned error + bufout := bytes.Buffer{} + buferr := bytes.Buffer{} + opts.Stdout = &bufout + opts.Stderr = &buferr + + _, _, err2 := ExecWithOptions(s.cli, opts) // Output is not needed + c.Assert(err2, Not(IsNil)) + + var ee2 *ExecError + ok = errors.As(err2, &ee2) + c.Assert(ok, Equals, true) + + // When error happens, stdout/stderr buffers should contain all lines produced by an app + c.Assert(bufout.String(), Equals, strings.Join(testCase.expectedOut, "\n")) + c.Assert(buferr.String(), Equals, strings.Join(testCase.expectedErr, "\n")) + + // When error happens, ExecError should contain only last ten lines of stdout/stderr + c.Assert(ee2.Stdout(), Equals, strings.Join(getSliceTail(testCase.expectedOut, logTailDefaultLength), "\r\n")) + c.Assert(ee2.Stderr(), Equals, strings.Join(getSliceTail(testCase.expectedErr, logTailDefaultLength), "\r\n")) + + // When error happens, ExecError should include stdout/stderr into its text representation + c.Assert(ee2.Error(), Equals, testCase.expectedText) + } +} + func (s *ExecSuite) TestExecEcho(c *C) { cmd := []string{"sh", "-c", "cat -"} c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning) diff --git a/pkg/kube/pod_command_executor.go b/pkg/kube/pod_command_executor.go index c011f9ba17..c6bc8f781a 100644 --- a/pkg/kube/pod_command_executor.go +++ b/pkg/kube/pod_command_executor.go @@ -21,36 +21,6 @@ import ( "k8s.io/client-go/kubernetes" ) -// ExecError is an error returned by PodCommandExecutor.Exec -// It contains not only error happened during an execution, but also keeps tails of stdout/stderr streams. -// These tails could be used by the invoker to construct more precise error. -type ExecError struct { - error - stdout LogTail - stderr LogTail -} - -// NewExecError creates an instance of ExecError -func NewExecError(err error, stdout, stderr LogTail) *ExecError { - return &ExecError{ - error: err, - stdout: stdout, - stderr: stderr, - } -} - -func (e *ExecError) Unwrap() error { - return e.error -} - -func (e *ExecError) Stdout() string { - return e.stdout.ToString() -} - -func (e *ExecError) Stderr() string { - return e.stderr.ToString() -} - // PodCommandExecutor provides a way to execute a command within the pod. // Is intended to be returned by PodController and works with pod controlled by it. type PodCommandExecutor interface { @@ -68,32 +38,23 @@ type podCommandExecutor struct { } // Exec runs the command and logs stdout and stderr. -// In case of execution error, ExecError will be returned +// In case of execution error, ExecError produced by ExecWithOptions will be returned func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin io.Reader, stdout, stderr io.Writer) error { var ( - stderrTail = NewLogTail(logTailDefaultLength) - stdoutTail = NewLogTail(logTailDefaultLength) - opts = ExecOptions{ + opts = ExecOptions{ Command: command, Namespace: p.namespace, PodName: p.podName, ContainerName: p.containerName, Stdin: stdin, - Stdout: stdoutTail, - Stderr: stderrTail, + Stdout: stdout, + Stderr: stderr, } cmdDone = make(chan struct{}) err error ) - if stdout != nil { - opts.Stdout = io.MultiWriter(stdout, stdoutTail) - } - if stderr != nil { - opts.Stderr = io.MultiWriter(stderr, stderrTail) - } - go func() { _, _, err = p.pcep.ExecWithOptions(opts) close(cmdDone) @@ -103,13 +64,6 @@ func (p *podCommandExecutor) Exec(ctx context.Context, command []string, stdin i case <-ctx.Done(): err = ctx.Err() case <-cmdDone: - if err != nil { - err = &ExecError{ - error: err, - stdout: stdoutTail, - stderr: stderrTail, - } - } } return err diff --git a/pkg/kube/pod_command_executor_test.go b/pkg/kube/pod_command_executor_test.go index 98290815ed..41361a0e36 100644 --- a/pkg/kube/pod_command_executor_test.go +++ b/pkg/kube/pod_command_executor_test.go @@ -17,9 +17,7 @@ package kube import ( "bytes" "context" - "fmt" "os" - "strings" "sync" "time" @@ -183,49 +181,6 @@ func (s *PodCommandExecutorTestSuite) TestPodRunnerExec(c *C) { c.Assert(bStdout.String(), Equals, expStdout) c.Assert(bStderr.String(), Equals, expStderr) }, - "In case of failure, we have tail of logs": func(ctx context.Context, pr PodCommandExecutor, prp *fakePodCommandExecutorProcessor) { - var errorLines []string - var outputLines []string - for i := 1; i <= 12; i++ { - errorLines = append(errorLines, fmt.Sprintf("error line %d", i)) - outputLines = append(outputLines, fmt.Sprintf("output line %d", i)) - } - - var err error - prp.execWithOptionsStdout = strings.Join(outputLines, "\n") - prp.execWithOptionsStderr = strings.Join(errorLines, "\n") - prp.execWithOptionsErr = errors.New("SimulatedError") - - expStdout := prp.execWithOptionsStdout - expStderr := prp.execWithOptionsStderr - expErrorStderr := strings.Join(errorLines[2:], "\r\n") - expErrorStdout := strings.Join(outputLines[2:], "\r\n") - - var bStdin, bStdout, bStderr bytes.Buffer - var wg sync.WaitGroup - wg.Add(1) - go func() { - err = pr.Exec(ctx, command, &bStdin, &bStdout, &bStderr) - wg.Done() - }() - prp.execWithOptionsSyncStart.Sync() // Ensure ExecWithOptions is called - wg.Wait() - prp.execWithOptionsSyncEnd.Sync() // Release ExecWithOptions - - c.Assert(err, Not(IsNil)) - c.Assert(prp.inExecWithOptionsOpts.Stdout, Not(IsNil)) - c.Assert(prp.inExecWithOptionsOpts.Stderr, Not(IsNil)) - c.Assert(bStdout.Len() > 0, Equals, true) - c.Assert(bStderr.Len() > 0, Equals, true) - c.Assert(bStdout.String(), Equals, expStdout) - c.Assert(bStderr.String(), Equals, expStderr) - - var ee *ExecError - c.Assert(errors.As(err, &ee), Equals, true) - c.Assert(ee.Error(), Equals, "SimulatedError") - c.Assert(ee.Stderr(), Equals, expErrorStderr) - c.Assert(ee.Stdout(), Equals, expErrorStdout) - }, } for l, tc := range cases {