Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: switch to bufio.Reader for KubeTask output parsing #2641

Merged
merged 13 commits into from
Feb 15, 2024
41 changes: 25 additions & 16 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"context"
"io"
"os"
"regexp"
"strings"
"time"

kanister "github.com/kanisterio/kanister/pkg"
Expand Down Expand Up @@ -60,22 +60,31 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
if out == "" {
return nil, nil
}
var op map[string]interface{}
logs := regexp.MustCompile("[\n]").Split(out, -1)
for _, l := range logs {
opObj, err := output.Parse(l)
if err != nil {
return nil, err
}
if opObj == nil {
continue
}
if op == nil {
op = make(map[string]interface{})
}
op[opObj.Key] = opObj.Value

// var op map[string]interface{}
reader := io.NopCloser(strings.NewReader(out))
output, err := output.LogAndParse(context.Background(), reader)

// For some reason we expect empty output to be returned as nil here
if len(output) == 0 {
return nil, err
}
return op, nil
return output, err
// logs := regexp.MustCompile("[\n]").Split(out, -1)
// for _, l := range logs {
// opObj, err := output.Parse(l)
// if err != nil {
// return nil, err
// }
// if opObj == nil {
// continue
// }
// if op == nil {
// op = make(map[string]interface{})
// }
// op[opObj.Key] = opObj.Value
// }
// return op, nil
}

func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
Expand Down
62 changes: 62 additions & 0 deletions pkg/function/kube_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package function

import (
"context"
"fmt"
"os"
"strings"
"time"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -61,6 +63,24 @@ func (s *KubeTaskSuite) TearDownSuite(c *C) {
}
}

func bigOutputPhase(namespace string) crv1alpha1.BlueprintPhase {
longstring := strings.Repeat("a", 100000)
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Func: KubeTaskFuncName,
Args: map[string]interface{}{
KubeTaskNamespaceArg: namespace,
KubeTaskImageArg: consts.LatestKanisterToolsImage,
KubeTaskCommandArg: []string{
"sh",
"-c",
// We output a line for log only, and a line with output at the tail
fmt.Sprintf("echo -n %s > tmpfile; cat tmpfile; echo; cat tmpfile; kando output longstring $(cat tmpfile)", longstring),
},
},
}
}

func outputPhase(namespace string) crv1alpha1.BlueprintPhase {
return crv1alpha1.BlueprintPhase{
Name: "testOutput",
Expand Down Expand Up @@ -161,3 +181,45 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) {
}
}
}

func (s *KubeTaskSuite) TestKubeTaskWithBigOutput(c *C) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
tp := param.TemplateParams{
StatefulSet: &param.StatefulSetParams{
Namespace: s.namespace,
},
PodOverride: crv1alpha1.JSONMap{
"containers": []map[string]interface{}{
{
"name": "container",
"imagePullPolicy": "Always",
},
},
},
}
expectedOut := strings.Repeat("a", 100000)
action := "test"
for _, tc := range []struct {
bp *crv1alpha1.Blueprint
outs []map[string]interface{}
}{
{
bp: newTaskBlueprint(bigOutputPhase(s.namespace)),
outs: []map[string]interface{}{
{
"longstring": expectedOut,
},
},
},
} {
phases, err := kanister.GetPhases(*tc.bp, action, kanister.DefaultVersion, tp)
c.Assert(err, IsNil)
c.Assert(phases, HasLen, len(tc.outs))
for i, p := range phases {
out, err := p.Exec(ctx, *tc.bp, action, tp)
c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name()))
c.Assert(out, DeepEquals, tc.outs[i])
}
}
}
16 changes: 1 addition & 15 deletions pkg/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"io"
"os"
"regexp"
"strings"

"github.com/pkg/errors"
)
Expand All @@ -47,7 +46,7 @@ func marshalOutput(key, value string) (string, error) {
}

// UnmarshalOutput unmarshals output json into Output struct
func UnmarshalOutput(opString string) (*Output, error) {
func UnmarshalOutput(opString []byte) (*Output, error) {
p := &Output{}
err := json.Unmarshal([]byte(opString), p)
return p, errors.Wrap(err, "Failed to unmarshal key-value pair")
Expand Down Expand Up @@ -89,16 +88,3 @@ func fPrintOutput(w io.Writer, key, value string) error {
const reStr = PhaseOpString + `(.*)$`

var logRE = regexp.MustCompile(reStr)

func Parse(l string) (*Output, error) {
l = strings.TrimSpace(l)
match := logRE.FindAllStringSubmatch(l, 1)
if len(match) == 0 {
return nil, nil
}
op, err := UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}
21 changes: 19 additions & 2 deletions pkg/output/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package output

import (
"bytes"
"context"
"io"
"strings"
"testing"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -45,13 +48,27 @@ func (s *OutputSuite) TestValidateKey(c *C) {
}
}

// FIXME: replace this with TestLogAndParse
e-sumin marked this conversation as resolved.
Show resolved Hide resolved
func parse(str string) (*Output, error) {
reader := io.NopCloser(strings.NewReader(str))
output, err := LogAndParse(context.Background(), reader)
var o *Output
for k, v := range output {
o = &Output{
Key: k,
Value: v.(string),
}
}
return o, err
}

func (s *OutputSuite) TestParseValid(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)

o, err := Parse(b.String())
o, err := parse(b.String())
c.Assert(err, IsNil)
c.Assert(o, NotNil)
c.Assert(o.Key, Equals, key)
Expand All @@ -77,7 +94,7 @@ func (s *OutputSuite) TestParseNoOutput(c *C) {
checker: IsNil,
},
} {
o, err := Parse(tc.s)
o, err := parse(tc.s)
c.Assert(err, tc.checker)
c.Assert(o, IsNil)
}
Expand Down
Loading
Loading