Skip to content

Commit

Permalink
Add log reader helper (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
tdmanv authored Sep 6, 2019
1 parent bac09fd commit af6363e
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 0 deletions.
41 changes: 41 additions & 0 deletions pkg/kube/log_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kube

import (
"io"

"k8s.io/client-go/rest"
)

var _ io.ReadCloser = (*logReader)(nil)

func newLogReader(rw rest.ResponseWrapper) *logReader {
return &logReader{rw: rw}
}

// logReader defers a call to ResponseWrapper.Stream() until Read is called.
// This is to help handle kubernetes behavior where calls to
// Pod.GetLogs(...).Stream() will hang until at least one byte is returned.
// kubectl logs handles this in
// https://github.com/kubernetes/kubernetes/pull/67573/files#diff-12d472fe036bbe778e84dffc71564eb1R355
type logReader struct {
rw rest.ResponseWrapper
err error
rc io.ReadCloser
}

func (lr *logReader) Read(p []byte) (n int, err error) {
if lr.rc == nil {
lr.rc, lr.err = lr.rw.Stream()
}
if lr.err != nil {
return 0, lr.err
}
return lr.rc.Read(p)
}

func (lr *logReader) Close() error {
if lr.rc == nil {
return nil
}
return lr.rc.Close()
}
86 changes: 86 additions & 0 deletions pkg/kube/log_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kube

import (
"bytes"
"fmt"
"io"
"io/ioutil"

. "gopkg.in/check.v1"
"k8s.io/client-go/rest"
)

type LogReaderSuite struct{}

var _ = Suite(&LogReaderSuite{})

var _ io.ReadCloser = (*buffer)(nil)

type buffer struct {
*bytes.Buffer
}

func (b buffer) Close() error {
return nil
}

var _ rest.ResponseWrapper = (*fakeResponseWrapper)(nil)

type fakeResponseWrapper struct {
err error
buf *bytes.Buffer
}

func (frw *fakeResponseWrapper) DoRaw() ([]byte, error) {
return nil, nil
}
func (frw *fakeResponseWrapper) Stream() (io.ReadCloser, error) {
return buffer{frw.buf}, frw.err
}

func (s *LogReaderSuite) TestLogReader(c *C) {
err := fmt.Errorf("TEST")
for _, tc := range []struct {
rw *fakeResponseWrapper
err error
out string
}{
{
rw: &fakeResponseWrapper{
err: nil,
buf: bytes.NewBuffer(nil),
},
err: nil,
out: "",
},
{
rw: &fakeResponseWrapper{
err: nil,
buf: bytes.NewBuffer([]byte("foo")),
},
err: nil,
out: "foo",
},
{
rw: &fakeResponseWrapper{
err: err,
buf: nil,
},
err: err,
out: "",
},
{
rw: &fakeResponseWrapper{
err: err,
buf: bytes.NewBuffer([]byte("foo")),
},
err: err,
out: "",
},
} {
lr := newLogReader(tc.rw)
out, err := ioutil.ReadAll(lr)
c.Assert(err, Equals, tc.err)
c.Assert(string(out), Equals, tc.out)
}
}

0 comments on commit af6363e

Please sign in to comment.