Skip to content

Commit

Permalink
Add New ExecOutput Function To Support Custom Writers (#1331)
Browse files Browse the repository at this point in the history
* Stream output from KubeExec

* Set error in pipe readers instead of a channel

* Fix compile errors

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Handle returned error by exec stream

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Don't add error while closing pipe

Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>

* Add a new kube.ExecAsync() function to handle async 'exec' outputs

This new library function works like kube.Exec(), except that it writes
the inbound outputs to stdout and stderr immediately. It delegates the
writes to a new format.Writer instance, which formats the outputs to match
the Kanister-style logs, before writing them to either stdout or stderr.

This ensures that the outputs of the remote exec operation is available
to log consumers immediately.

Signed-off-by: Ivan Sim <ivan.sim@kasten.io>

* Remove merge error

Signed-off-by: Ivan Sim <ivan.sim@kasten.io>

* Fix unit test

Signed-off-by: Ivan Sim <ivan.sim@kasten.io>

* Add stdout and stderr as function params

Signed-off-by: Ivan Sim <ivan.sim@kasten.io>

* Handle phase outputs found in stdout

Signed-off-by: Ivan Sim <ivan.sim@kasten.io>

Co-authored-by: Tom Manville <tom@kasten.io>
Co-authored-by: Prasad Ghangal <prasad.ghangal@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
4 people committed May 4, 2022
1 parent c3ee585 commit f3801e7
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 30 deletions.
17 changes: 17 additions & 0 deletions pkg/format/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ func Log(podName string, containerName string, output string) {
LogWithCtx(context.Background(), podName, containerName, output)
}

// LogTo prints output to w. The specified pod and container are added to the
// log line as fields.
func LogTo(w io.Writer, pod string, container string, output string) {
if output != "" {
for _, line := range regex.Split(output, -1) {
if line != "" {
fields := field.M{
"Pod": pod,
"Container": container,
"Out": line,
}
log.PrintTo(w, "action update", fields)
}
}
}
}

func LogStream(podName string, containerName string, output io.ReadCloser) chan string {
logCh := make(chan string, 100)
s := bufio.NewScanner(output)
Expand Down
18 changes: 18 additions & 0 deletions pkg/format/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package format

import (
"io"
)

// Writer formats strings before writing them to its underlying writer.
type Writer struct {
W io.Writer
Pod string
Container string
}

// Write formats p and write it to w's writer.
func (w *Writer) Write(p []byte) (int, error) {
LogTo(w.W, w.Pod, w.Container, string(p))
return len(p), nil
}
19 changes: 10 additions & 9 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package function

import (
"bytes"
"context"
"io"
"os"
"regexp"

"github.com/pkg/errors"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/output"
"github.com/kanisterio/kanister/pkg/param"
Expand Down Expand Up @@ -91,15 +91,16 @@ func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args
if err = Arg(args, KubeExecCommandArg, &cmd); err != nil {
return nil, err
}
stdout, stderr, err := kube.Exec(cli, namespace, pod, container, cmd, nil)
format.LogWithCtx(ctx, pod, container, stdout)
format.LogWithCtx(ctx, pod, container, stderr)
if err != nil {

var (
bufStdout = &bytes.Buffer{}
outWriters = io.MultiWriter(os.Stdout, bufStdout)
)
if err := kube.ExecOutput(cli, namespace, pod, container, cmd, nil, outWriters, os.Stderr); err != nil {
return nil, err
}

out, err := parseLogAndCreateOutput(stdout)
return out, errors.Wrap(err, "Failed to generate output")
return parseLogAndCreateOutput(bufStdout.String())
}

func (*kubeExecFunc) RequiredArgs() []string {
Expand Down
6 changes: 4 additions & 2 deletions pkg/function/kube_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ func (s *KubeExecTest) TestParseLogAndCreateOutput(c *C) {
outChecker Checker
}{
{"###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"}", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"}\n###Phase-output###: {\"key\":\"path\",\"value\":\"/backup/path\"}",
map[string]interface{}{"version": "0.78.0", "path": "/backup/path"}, IsNil, NotNil},
{"Random message ###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"}", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"},Random message", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"Random message ###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"},Random message", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"{\"Out\":\"###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"}\"}", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"{\"Out\":\"###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"}\", \"Pod\":\"my-pod\"}", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"Random message with newline \n###Phase-output###: {\"key\":\"version\",\"value\":\"0.78.0\"}", map[string]interface{}{"version": "0.78.0"}, IsNil, NotNil},
{"###Phase-output###: Invalid message", nil, NotNil, IsNil},
{"Random message", nil, IsNil, IsNil},
Expand Down
84 changes: 68 additions & 16 deletions pkg/kube/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"net/url"
"strings"

"k8s.io/api/core/v1"
"github.com/kanisterio/kanister/pkg/format"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
Expand All @@ -35,9 +37,9 @@ type ExecOptions struct {
PodName string
ContainerName string

Stdin io.Reader
CaptureStdout bool
CaptureStderr bool
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}

// Exec is our version of the call to `kubectl exec` that does not depend on
Expand All @@ -49,24 +51,68 @@ func Exec(cli kubernetes.Interface, namespace, pod, container string, command []
PodName: pod,
ContainerName: container,
Stdin: stdin,
CaptureStdout: true,
CaptureStderr: true,
}
return ExecWithOptions(cli, opts)
}

// ExecOutput is similar to Exec, except that inbound outputs are written to the
// provided stdout and stderr. Unlike Exec, the outputs are not returned to the
// caller.
func ExecOutput(cli kubernetes.Interface, namespace, pod, container string, command []string, stdin io.Reader, stdout, stderr io.Writer) error {
opts := ExecOptions{
Command: command,
Namespace: namespace,
PodName: pod,
ContainerName: container,
Stdin: stdin,
Stdout: &format.Writer{
W: stdout,
Pod: pod,
Container: container,
},
Stderr: &format.Writer{
W: stderr,
Pod: pod,
Container: container,
},
}

_, _, err := ExecWithOptions(cli, opts)
return err
}

// ExecWithOptions executes a command in the specified container,
// returning stdout, stderr and error. `options` allowed for
// additional parameters to be passed.
func ExecWithOptions(kubeCli kubernetes.Interface, options ExecOptions) (string, string, error) {
config, err := LoadConfig()
if err != nil {
return "", "", err
}

outbuf := &bytes.Buffer{}
if options.Stdout == nil {
options.Stdout = outbuf
}

errbuf := &bytes.Buffer{}
if options.Stderr == nil {
options.Stderr = errbuf
}

errCh := execStream(kubeCli, config, options)
err = <-errCh
return strings.TrimSpace(outbuf.String()), strings.TrimSpace(errbuf.String()), errors.Wrap(err, "Failed to exec command in pod")
}

func execStream(kubeCli kubernetes.Interface, config *restclient.Config, options ExecOptions) chan error {
const tty = false
req := kubeCli.CoreV1().RESTClient().Post().
Resource("pods").
Name(options.PodName).
Namespace(options.Namespace).
SubResource("exec")

// Add container name if passed
if len(options.ContainerName) != 0 {
req.Param("container", options.ContainerName)
}
Expand All @@ -75,19 +121,25 @@ func ExecWithOptions(kubeCli kubernetes.Interface, options ExecOptions) (string,
Container: options.ContainerName,
Command: options.Command,
Stdin: options.Stdin != nil,
Stdout: options.CaptureStdout,
Stderr: options.CaptureStderr,
Stdout: options.Stdout != nil,
Stderr: options.Stderr != nil,
TTY: tty,
}, scheme.ParameterCodec)

config, err := LoadConfig()
if err != nil {
return "", "", err
}
errCh := make(chan error, 1)
go func() {
err := execute(
"POST",
req.URL(),
config,
options.Stdin,
options.Stdout,
options.Stderr,
tty)
errCh <- err
}()

var stdout, stderr bytes.Buffer
err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty)
return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err
return errCh
}

func execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
Expand Down
67 changes: 67 additions & 0 deletions pkg/kube/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,73 @@ func (s *ExecSuite) TearDownSuite(c *C) {
}
}

func (s *ExecSuite) TestStderr(c *C) {
cmd := []string{"sh", "-c", "echo -n hello >&2"}
for _, cs := range s.pod.Status.ContainerStatuses {
stdout, stderr, err := Exec(s.cli, s.pod.Namespace, s.pod.Name, cs.Name, cmd, nil)
c.Assert(err, IsNil)
c.Assert(stdout, Equals, "")
c.Assert(stderr, Equals, "hello")
}

cmd = []string{"sh", "-c", "echo -n hello && exit 1"}
for _, cs := range s.pod.Status.ContainerStatuses {
stdout, stderr, err := Exec(s.cli, s.pod.Namespace, s.pod.Name, cs.Name, cmd, nil)
c.Assert(err, NotNil)
c.Assert(stdout, Equals, "hello")
c.Assert(stderr, Equals, "")
}

cmd = []string{"sh", "-c", "count=0; while true; do printf $count; let count=$count+1; if [ $count -eq 6 ]; then exit 1; fi; done"}
for _, cs := range s.pod.Status.ContainerStatuses {
stdout, stderr, err := Exec(s.cli, s.pod.Namespace, s.pod.Name, cs.Name, cmd, nil)
c.Assert(err, NotNil)
c.Assert(stdout, Equals, "012345")
c.Assert(stderr, Equals, "")
}
}

func (s *ExecSuite) TestExecWithWriterOptions(c *C) {
c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning)
c.Assert(len(s.pod.Status.ContainerStatuses) > 0, Equals, true)

var testCases = []struct {
cmd []string
expectedOut string
expectedErr string
}{
{
cmd: []string{"sh", "-c", "printf 'test'"},
expectedOut: "test",
expectedErr: "",
},
{
cmd: []string{"sh", "-c", "printf 'test' >&2"},
expectedOut: "",
expectedErr: "test",
},
}

for _, testCase := range testCases {
bufout := &bytes.Buffer{}
buferr := &bytes.Buffer{}

opts := ExecOptions{
Command: testCase.cmd,
Namespace: s.pod.Namespace,
PodName: s.pod.Name,
ContainerName: "", // use default container
Stdin: nil,
Stdout: bufout,
Stderr: buferr,
}
_, _, err := ExecWithOptions(s.cli, opts)
c.Assert(err, IsNil)
c.Assert(bufout.String(), Equals, testCase.expectedOut)
c.Assert(buferr.String(), Equals, testCase.expectedErr)
}
}

func (s *ExecSuite) TestExecEcho(c *C) {
cmd := []string{"sh", "-c", "cat -"}
c.Assert(s.pod.Status.Phase, Equals, v1.PodRunning)
Expand Down
20 changes: 19 additions & 1 deletion pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package log
import (
"context"
"encoding/json"
"io"
"os"
"strings"
"time"
Expand Down Expand Up @@ -167,6 +168,12 @@ func Print(msg string, fields ...field.M) {
Info().Print(msg, fields...)
}

// PrintTo works just like Print. It allows caller to specify the writer to use
// to output the log.
func PrintTo(w io.Writer, msg string, fields ...field.M) {
Info().PrintTo(w, msg, fields...)
}

func WithContext(ctx context.Context) Logger {
return Info().WithContext(ctx)
}
Expand All @@ -176,6 +183,17 @@ func WithError(err error) Logger {
}

func (l *logger) Print(msg string, fields ...field.M) {
entry := l.entry(fields...)
entry.Logln(logrus.Level(l.level), msg)
}

func (l *logger) PrintTo(w io.Writer, msg string, fields ...field.M) {
entry := l.entry(fields...)
entry.Logger.SetOutput(w)
entry.Logln(logrus.Level(l.level), msg)
}

func (l *logger) entry(fields ...field.M) *logrus.Entry {
logFields := make(logrus.Fields)

if envVarFields != nil {
Expand Down Expand Up @@ -205,7 +223,7 @@ func (l *logger) Print(msg string, fields ...field.M) {
if l.err != nil {
entry = entry.WithError(l.err)
}
entry.Logln(logrus.Level(l.level), msg)
return entry
}

func (l *logger) WithContext(ctx context.Context) Logger {
Expand Down
16 changes: 16 additions & 0 deletions pkg/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ func (s *LogSuite) TestLogWithContextFieldsAndError(c *C) {
c.Assert(entry["key"], Equals, "value")
}

func (s *LogSuite) TestLogPrintTo(c *C) {
buf := &bytes.Buffer{}
msg := "test log message"
fields := map[string]interface{}{
"field1": "value1",
"field2": "value2",
}
PrintTo(buf, msg, fields)

entry := map[string]interface{}{}
err := json.Unmarshal(buf.Bytes(), &entry)
c.Assert(err, IsNil)
c.Assert(entry, NotNil)
c.Assert(entry["msg"], Equals, msg)
}

func testLogMessage(c *C, msg string, print func(string, ...field.M), fields ...field.M) map[string]interface{} {
log.SetFormatter(&logrus.JSONFormatter{TimestampFormat: time.RFC3339Nano})
var memLog bytes.Buffer
Expand Down
2 changes: 2 additions & 0 deletions pkg/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package log

import (
"context"
"io"

"github.com/kanisterio/kanister/pkg/field"
)

type Logger interface {
Print(msg string, fields ...field.M)
PrintTo(w io.Writer, msg string, fields ...field.M)
WithContext(ctx context.Context) Logger
WithError(err error) Logger
}
Loading

0 comments on commit f3801e7

Please sign in to comment.