Skip to content

Commit

Permalink
Merge pull request #4234 from hashicorp/b-4159
Browse files Browse the repository at this point in the history
Fix race in StreamFramer and truncation in api/AllocFS.Logs
  • Loading branch information
schmichael committed May 4, 2018
2 parents 94ebbf0 + 6858c52 commit f2f6dab
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 81 deletions.
12 changes: 10 additions & 2 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,15 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
// The chan will be closed when follow=false and the end of the file is
// reached.
//
// Unexpected (non-EOF) errors will be sent on the error chan.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {

errCh := make(chan error, 1)

nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
Expand Down Expand Up @@ -315,8 +320,11 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
errCh <- err
close(frames)
if err == io.EOF || err == io.ErrClosedPipe {
close(frames)
} else {
errCh <- err
}
return
}

Expand Down
148 changes: 148 additions & 0 deletions api/fs_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,162 @@
package api

import (
"bytes"
"fmt"
"io"
"reflect"
"strings"
"testing"
"time"

units "github.com/docker/go-units"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFS_Logs(t *testing.T) {
t.Parallel()
require := require.New(t)
rpcPort := 0
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
rpcPort = c.Ports.RPC
c.Client = &testutil.ClientConfig{
Enabled: true,
}
})
defer s.Stop()

//TODO There should be a way to connect the client to the servers in
//makeClient above
require.NoError(c.Agent().SetServers([]string{fmt.Sprintf("127.0.0.1:%d", rpcPort)}))

index := uint64(0)
testutil.WaitForResult(func() (bool, error) {
nodes, qm, err := c.Nodes().List(&QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
index = qm.LastIndex
if len(nodes) != 1 {
return false, fmt.Errorf("expected 1 node but found: %s", pretty.Sprint(nodes))
}
if nodes[0].Status != "ready" {
return false, fmt.Errorf("node not ready: %s", nodes[0].Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

var input strings.Builder
input.Grow(units.MB)
lines := 80 * units.KB
for i := 0; i < lines; i++ {
fmt.Fprintf(&input, "%d\n", i)
}

job := &Job{
ID: helper.StringToPtr("TestFS_Logs"),
Region: helper.StringToPtr("global"),
Datacenters: []string{"dc1"},
Type: helper.StringToPtr("batch"),
TaskGroups: []*TaskGroup{
{
Name: helper.StringToPtr("TestFS_LogsGroup"),
Tasks: []*Task{
{
Name: "logger",
Driver: "mock_driver",
Config: map[string]interface{}{
"stdout_string": input.String(),
},
},
},
},
},
}

jobs := c.Jobs()
jobResp, _, err := jobs.Register(job, nil)
require.NoError(err)

index = jobResp.EvalCreateIndex
evals := c.Evaluations()
testutil.WaitForResult(func() (bool, error) {
evalResp, qm, err := evals.Info(jobResp.EvalID, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if evalResp.BlockedEval != "" {
t.Fatalf("Eval blocked: %s", pretty.Sprint(evalResp))
}
index = qm.LastIndex
if evalResp.Status != "complete" {
return false, fmt.Errorf("eval status: %v", evalResp.Status)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

allocID := ""
testutil.WaitForResult(func() (bool, error) {
allocs, _, err := jobs.Allocations(*job.ID, true, &QueryOptions{WaitIndex: index})
if err != nil {
return false, err
}
if len(allocs) != 1 {
return false, fmt.Errorf("unexpected number of allocs: %d", len(allocs))
}
if allocs[0].ClientStatus != "complete" {
return false, fmt.Errorf("alloc not complete: %s", allocs[0].ClientStatus)
}
allocID = allocs[0].ID
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

alloc, _, err := c.Allocations().Info(allocID, nil)
require.NoError(err)

for i := 0; i < 3; i++ {
stopCh := make(chan struct{})
defer close(stopCh)

frames, errors := c.AllocFS().Logs(alloc, false, "logger", "stdout", "start", 0, stopCh, nil)

var result bytes.Buffer
READ_FRAMES:
for {
select {
case f := <-frames:
if f == nil {
break READ_FRAMES
}
result.Write(f.Data)
case err := <-errors:
// Don't Fatal here as the other assertions may
// contain helpeful information.
t.Errorf("Error: %v", err)
}
}

// Check length
assert.Equal(t, input.Len(), result.Len(), "file size mismatch")

// Check complete ordering
for i := 0; i < lines; i++ {
line, err := result.ReadBytes('\n')
require.NoErrorf(err, "unexpected error on line %d: %v", i, err)
require.Equal(fmt.Sprintf("%d\n", i), string(line))
}
}
}

func TestFS_FrameReader(t *testing.T) {
t.Parallel()
// Create a channel of the frames and a cancel channel
Expand Down
21 changes: 10 additions & 11 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ OUTER:
streamErr = err
break OUTER
}
encoder.Reset(conn)
case <-ctx.Done():
break OUTER
}
Expand Down Expand Up @@ -405,8 +406,6 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {

frames := make(chan *sframer.StreamFrame, streamFramesBuffer)
errCh := make(chan error)
var buf bytes.Buffer
frameCodec := codec.NewEncoder(&buf, structs.JsonHandle)

// Start streaming
go func() {
Expand All @@ -423,20 +422,23 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
go func() {
for {
if _, err := conn.Read(nil); err != nil {
if err == io.EOF {
if err == io.EOF || err == io.ErrClosedPipe {
// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
select {
case errCh <- err:
case <-ctx.Done():
return
}
return
}
}
}()

var streamErr error
buf := new(bytes.Buffer)
frameCodec := codec.NewEncoder(buf, structs.JsonHandle)
OUTER:
for {
select {
Expand All @@ -455,6 +457,7 @@ OUTER:
streamErr = err
break OUTER
}
frameCodec.Reset(buf)

resp.Payload = buf.Bytes()
buf.Reset()
Expand All @@ -464,6 +467,7 @@ OUTER:
streamErr = err
break OUTER
}
encoder.Reset(conn)
}
}

Expand Down Expand Up @@ -576,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
// #3342
select {
case <-framer.ExitCh():
err := parseFramerErr(framer.Err())
if err == syscall.EPIPE {
// EPIPE just means the connection was closed
return nil
}
return err
return nil
default:
}

Expand Down Expand Up @@ -705,7 +704,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err())
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
Expand Down
Loading

0 comments on commit f2f6dab

Please sign in to comment.