Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logs via a browser #2235

Merged
merged 5 commits into from
Jan 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (s *StreamFrame) IsCleared() bool {

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
// plainTxt determines whether we frame or just send plain text data.
plainTxt bool

out io.WriteCloser
enc *codec.Encoder
encLock sync.Mutex
Expand All @@ -281,8 +284,11 @@ type StreamFramer struct {
}

// NewStreamFramer creates a new stream framer that will output StreamFrames to
// the passed output.
func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {
// the passed output. If plainTxt is set we do not frame and just batch plain
// text data.
func NewStreamFramer(out io.WriteCloser, plainTxt bool,
heartbeatRate, batchWindow time.Duration, frameSize int) *StreamFramer {

// Create a JSON encoder
enc := codec.NewEncoder(out, jsonHandle)

Expand All @@ -291,6 +297,7 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
flusher := time.NewTicker(batchWindow)

return &StreamFramer{
plainTxt: plainTxt,
out: out,
enc: enc,
frameSize: frameSize,
Expand Down Expand Up @@ -390,6 +397,10 @@ OUTER:
func (s *StreamFramer) send(f *StreamFrame) error {
s.encLock.Lock()
defer s.encLock.Unlock()
if s.plainTxt {
_, err := io.Copy(s.out, bytes.NewReader(f.Data))
return err
}
return s.enc.Encode(f)
}

Expand Down Expand Up @@ -549,7 +560,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
output := ioutils.NewWriteFlusher(resp)

// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(output, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -697,7 +708,7 @@ OUTER:
// applied. Defaults to "start".
func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var allocID, task, logType string
var follow bool
var plain, follow bool
var err error

q := req.URL.Query()
Expand All @@ -710,8 +721,16 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
return nil, taskNotPresentErr
}

if follow, err = strconv.ParseBool(q.Get("follow")); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
if followStr := q.Get("follow"); followStr != "" {
if follow, err = strconv.ParseBool(followStr); err != nil {
return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err)
}
}

if plainStr := q.Get("plain"); plainStr != "" {
if plain, err = strconv.ParseBool(plainStr); err != nil {
return nil, fmt.Errorf("Failed to parse plain field to boolean: %v", err)
}
}

logType = q.Get("type")
Expand Down Expand Up @@ -747,15 +766,15 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac
// Create an output that gets flushed on every write
output := ioutils.NewWriteFlusher(resp)

return nil, s.logs(follow, offset, origin, task, logType, fs, output)
return nil, s.logs(follow, plain, offset, origin, task, logType, fs, output)
}

func (s *HTTPServer) logs(follow bool, offset int64,
func (s *HTTPServer) logs(follow, plain bool, offset int64,
origin, task, logType string,
fs allocdir.AllocDirFS, output io.WriteCloser) error {

// Create the framer
framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(output, plain, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down
119 changes: 108 additions & 11 deletions command/agent/fs_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -118,7 +119,7 @@ func TestStreamFramer_Flush(t *testing.T) {
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestStreamFramer_Batch(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 500*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 3)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 3)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -263,7 +264,7 @@ func TestStreamFramer_Heartbeat(t *testing.T) {
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 100)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 100)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -315,7 +316,7 @@ func TestStreamFramer_Order(t *testing.T) {
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, hRate, bWindow, 10)
sf := NewStreamFramer(wrappedW, false, hRate, bWindow, 10)
sf.Run()

// Create a decoder
Expand Down Expand Up @@ -401,6 +402,102 @@ func TestStreamFramer_Order(t *testing.T) {
}
}

// This test checks that frames are received in order
func TestStreamFramer_Order_PlainText(t *testing.T) {
// Create the stream framer
r, w := io.Pipe()
wrappedW := &WriteCloseChecker{WriteCloser: w}
// Ensure the batch window doesn't get hit
hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond
sf := NewStreamFramer(wrappedW, true, hRate, bWindow, 10)
sf.Run()

files := []string{"1", "2", "3", "4", "5"}
input := bytes.NewBuffer(make([]byte, 0, 100000))
for i := 0; i <= 1000; i++ {
str := strconv.Itoa(i) + ","
input.WriteString(str)
}

expected := bytes.NewBuffer(make([]byte, 0, 100000))
for _, _ = range files {
expected.Write(input.Bytes())
}
receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000))

// Start the reader
resultCh := make(chan struct{})
go func() {
OUTER:
for {
if _, err := receivedBuf.ReadFrom(r); err != nil {
if strings.Contains(err.Error(), "closed pipe") {
resultCh <- struct{}{}
return
}
t.Fatalf("bad read: %v", err)
}

if expected.Len() != receivedBuf.Len() {
continue
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
continue OUTER
}
}
resultCh <- struct{}{}
return

}
}()

// Send the data
b := input.Bytes()
shards := 10
each := len(b) / shards
for _, f := range files {
for i := 0; i < shards; i++ {
l, r := each*i, each*(i+1)
if i == shards-1 {
r = len(b)
}

if err := sf.Send(f, "", b[l:r], 0); err != nil {
t.Fatalf("Send() failed %v", err)
}
}
}

// Ensure we get data
select {
case <-resultCh:
case <-time.After(10 * time.Duration(testutil.TestMultiplier()) * bWindow):
if expected.Len() != receivedBuf.Len() {
t.Fatalf("Got %v; want %v", expected.Len(), receivedBuf.Len())
}
expectedBytes := expected.Bytes()
actualBytes := receivedBuf.Bytes()
for i, e := range expectedBytes {
if a := actualBytes[i]; a != e {
t.Fatalf("Index %d; Got %q; want %q", i, a, e)
}
}
}

// Close the reader and wait. This should cause the runner to exit
if err := r.Close(); err != nil {
t.Fatalf("failed to close reader")
}

sf.Destroy()
if !wrappedW.Closed {
t.Fatalf("writer not closed")
}
}

func TestHTTP_Stream_MissingParams(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil)
Expand Down Expand Up @@ -467,7 +564,7 @@ func TestHTTP_Stream_NoFile(t *testing.T) {
ad := tempAllocDir(t)
defer os.RemoveAll(ad.AllocDir)

framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -526,7 +623,7 @@ func TestHTTP_Stream_Modify(t *testing.T) {
t.Fatalf("write failed: %v", err)
}

framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -607,7 +704,7 @@ func TestHTTP_Stream_Truncate(t *testing.T) {
t.Fatalf("write failed: %v", err)
}

framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(w, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()
defer framer.Destroy()

Expand Down Expand Up @@ -710,7 +807,7 @@ func TestHTTP_Stream_Delete(t *testing.T) {
t.Fatalf("write failed: %v", err)
}

framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer := NewStreamFramer(wrappedW, false, streamHeartbeatRate, streamBatchWindow, streamFrameSize)
framer.Run()

// Start streaming
Expand Down Expand Up @@ -804,7 +901,7 @@ func TestHTTP_Logs_NoFollow(t *testing.T) {

// Start streaming logs
go func() {
if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
if err := s.Server.logs(false, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand Down Expand Up @@ -893,7 +990,7 @@ func TestHTTP_Logs_Follow(t *testing.T) {

// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand Down Expand Up @@ -1006,7 +1103,7 @@ func BenchmarkHTTP_Logs_Follow(t *testing.B) {

// Start streaming logs
go func() {
if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
if err := s.Server.logs(true, false, 0, OriginStart, task, logType, ad, wrappedW); err != nil {
t.Fatalf("logs() failed: %v", err)
}
}()
Expand Down
5 changes: 5 additions & 0 deletions website/source/docs/http/client-fs.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ allocation was placed.
Origin can be either "start" or "end" and applies the offset relative to
either the start or end of the logs respectively. Defaults to "start".
</li>
<li>
<span class="param">plain</span>
A boolean of whether to return just the plain text without framing.
This can be usef when viewing logs in a browser.
</li>
</ul>
</dd>

Expand Down