Skip to content

Commit

Permalink
Refactor Log Parsing (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdmanv committed Sep 4, 2019
1 parent ada0771 commit 42d113d
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 33 deletions.
17 changes: 1 addition & 16 deletions pkg/function/kube_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package function
import (
"context"
"regexp"
"strings"

"github.com/pkg/errors"

Expand Down Expand Up @@ -56,7 +55,7 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
var op map[string]interface{}
logs := regexp.MustCompile("[\n]").Split(out, -1)
for _, l := range logs {
opObj, err := parseLogLineForOutput(l)
opObj, err := output.Parse(l)
if err != nil {
return nil, err
}
Expand All @@ -71,20 +70,6 @@ func parseLogAndCreateOutput(out string) (map[string]interface{}, error) {
return op, nil
}

var outputRE = regexp.MustCompile(`###Phase-output###:(.*?)*$`)

func parseLogLineForOutput(l string) (*output.Output, error) {
if !strings.Contains(l, output.PhaseOpString) {
return nil, nil
}
match := outputRE.FindAllStringSubmatch(l, 1)
op, err := output.UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}

func (kef *kubeExecFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
cli, err := kube.NewClient()
if err != nil {
Expand Down
23 changes: 7 additions & 16 deletions pkg/function/kube_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/output"
"github.com/kanisterio/kanister/pkg/param"
)

Expand Down Expand Up @@ -81,24 +81,15 @@ func kubeTaskPodFunc(cli kubernetes.Interface) func(ctx context.Context, pod *v1
if err != nil {
return nil, errors.Wrapf(err, "Failed to fetch logs from the pod")
}
defer r.Close()
logCh := format.LogStream(pod.Name, pod.Spec.Containers[0].Name, r)
// Wait for pod completion
err = kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name)
out, err := output.LogAndParse(ctx, r)
if err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
return nil, err
}
out := make(map[string]interface{})
for l := range logCh {
op, err := parseLogLineForOutput(l)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse phase output")
}
if op != nil {
out[op.Key] = op.Value
}
// Wait for pod completion
if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil {
return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name)
}
return out, nil
return out, err
}
}

Expand Down
26 changes: 25 additions & 1 deletion pkg/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package output
import (
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"strings"

"github.com/pkg/errors"
)
Expand Down Expand Up @@ -66,10 +69,31 @@ func ValidateKey(key string) error {

// PrintOutput runs the `kando output` command
func PrintOutput(key, value string) error {
return fPrintOutput(os.Stdout, key, value)
}

func fPrintOutput(w io.Writer, key, value string) error {
outString, err := marshalOutput(key, value)
if err != nil {
return err
}
fmt.Println(PhaseOpString, outString)
fmt.Fprintln(w, PhaseOpString, outString)
return nil
}

const reStr = PhaseOpString + `(.*)$`

var logRE = regexp.MustCompile(reStr)

func Parse(l string) (*Output, error) {
l = strings.TrimSpace(l)
match := logRE.FindAllStringSubmatch(l, 1)
if len(match) == 0 {
return nil, nil
}
op, err := UnmarshalOutput(match[0][1])
if err != nil {
return nil, err
}
return op, nil
}
39 changes: 39 additions & 0 deletions pkg/output/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package output

import (
"bytes"
"testing"

. "gopkg.in/check.v1"
Expand Down Expand Up @@ -43,3 +44,41 @@ func (s *OutputSuite) TestValidateKey(c *C) {
c.Check(err, tc.checker, Commentf("Key (%s) failed!", tc.key))
}
}

func (s *OutputSuite) TestParseValid(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)

o, err := Parse(b.String())
c.Assert(err, IsNil)
c.Assert(o, NotNil)
c.Assert(o.Key, Equals, key)
c.Assert(o.Value, Equals, val)
}

func (s *OutputSuite) TestParseNoOutput(c *C) {
key, val := "foo", "bar"
b := bytes.NewBuffer(nil)
err := fPrintOutput(b, key, val)
c.Check(err, IsNil)
valid := b.String()
for _, tc := range []struct {
s string
checker Checker
}{
{
s: valid[0 : len(valid)-2],
checker: NotNil,
},
{
s: valid[1 : len(valid)-1],
checker: IsNil,
},
} {
o, err := Parse(tc.s)
c.Assert(err, tc.checker)
c.Assert(o, IsNil)
}
}
73 changes: 73 additions & 0 deletions pkg/output/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2019 The Kanister Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package output

import (
"bufio"
"context"
"io"
"strings"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

func splitLines(ctx context.Context, r io.ReadCloser, f func(context.Context, string) error) error {
// Call r.Close() if the context is canceled or if s.Scan() == false.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-ctx.Done()
_ = r.Close()
}()

// Scan log lines when ready.
s := bufio.NewScanner(r)
for s.Scan() {
l := s.Text()
l = strings.TrimSpace(l)
if l == "" {
continue
}
if err := f(ctx, l); err != nil {
return err
}
}
return errors.Wrap(s.Err(), "Split lines failed")
}

func LogAndParse(ctx context.Context, r io.ReadCloser) (map[string]interface{}, error) {
out := make(map[string]interface{})
err := splitLines(ctx, r, func(ctx context.Context, l string) error {
log.Info("Pod Out:", l)
o, err := Parse(l)
if err != nil {
return err
}
if o != nil {
out[o.Key] = o.Value
}
return nil
})
return out, err
}

func Log(ctx context.Context, r io.ReadCloser) error {
err := splitLines(ctx, r, func(ctx context.Context, l string) error {
log.Info("Pod Out:", l)
return nil
})
return err
}

0 comments on commit 42d113d

Please sign in to comment.