Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad logs command to stream task logs #1444

Merged
merged 16 commits into from
Jul 25, 2016
147 changes: 120 additions & 27 deletions api/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -204,6 +205,7 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error {
// * path: path to file to stream.
// * offset: The offset to start streaming data at.
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
Expand Down Expand Up @@ -275,19 +277,101 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, nil, nil
}

// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
// * follow: Whether the logs should be followed.
// * task: the tasks name to stream logs for.
// * logType: Either "stdout" or "stderr"
// * origin: Either "start" or "end" and defines from where the offset is applied.
// * offset: The offset to start streaming data at.
// * cancel: A channel that when closed, streaming will end.
//
// The return value is a channel that will emit StreamFrames as they are read.
func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) {

node, _, err := a.client.Nodes().Info(alloc.NodeID, q)
if err != nil {
return nil, nil, err
}

if node.HTTPAddr == "" {
return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID)
}
u := &url.URL{
Scheme: "http",
Host: node.HTTPAddr,
Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID),
}
v := url.Values{}
v.Set("follow", strconv.FormatBool(follow))
v.Set("task", task)
v.Set("type", logType)
v.Set("origin", origin)
v.Set("offset", strconv.FormatInt(offset, 10))
u.RawQuery = v.Encode()
req := &http.Request{
Method: "GET",
URL: u,
Cancel: cancel,
}
c := http.Client{}
resp, err := c.Do(req)
if err != nil {
return nil, nil, err
}

// Create the output channel
frames := make(chan *StreamFrame, 10)

go func() {
// Close the body
defer resp.Body.Close()

// Create a decoder
dec := json.NewDecoder(resp.Body)

for {
// Check if we have been cancelled
select {
case <-cancel:
return
default:
}

// Decode the next frame
var frame StreamFrame
if err := dec.Decode(&frame); err != nil {
close(frames)
return
}

// Discard heartbeat frames
if frame.IsHeartbeat() {
continue
}

frames <- &frame
}
}()

return frames, nil, nil
}

// FrameReader is used to convert a stream of frames into a read closer.
type FrameReader struct {
frames <-chan *StreamFrame
cancelCh chan struct{}
closed bool

closedLock sync.Mutex
closed bool

unblockTime time.Duration

frame *StreamFrame
frameOffset int

// To handle printing the file events
fileEventOffset int
fileEvent []byte

byteOffset int
}

Expand All @@ -300,6 +384,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe
}
}

// SetUnblockTime sets the time to unblock and return zero bytes read. If the
// duration is unset or is zero or less, the read will block til data is read.
func (f *FrameReader) SetUnblockTime(d time.Duration) {
f.unblockTime = d
}

// Offset returns the offset into the stream.
func (f *FrameReader) Offset() int {
return f.byteOffset
Expand All @@ -308,32 +398,33 @@ func (f *FrameReader) Offset() int {
// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
// when there are no more frames.
func (f *FrameReader) Read(p []byte) (n int, err error) {
if f.frame == nil {
frame, ok := <-f.frames
if !ok {
return 0, io.EOF
}
f.frame = frame

// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
f.closedLock.Lock()
closed := f.closed
f.closedLock.Unlock()
if closed {
return 0, io.EOF
}

if f.frame.FileEvent != "" && len(f.fileEvent) == 0 {
f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent))
f.fileEventOffset = 0
}
if f.frame == nil {
var unblock <-chan time.Time
if f.unblockTime.Nanoseconds() > 0 {
unblock = time.After(f.unblockTime)
}

// If there is a file event we inject it into the read stream
if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset {
n = copy(p, f.fileEvent[f.fileEventOffset:])
f.fileEventOffset += n
return n, nil
}
select {
case frame, ok := <-f.frames:
if !ok {
return 0, io.EOF
}
f.frame = frame

if len(f.fileEvent) == f.fileEventOffset {
f.fileEvent = nil
f.fileEventOffset = 0
// Store the total offset into the file
f.byteOffset = int(f.frame.Offset)
case <-unblock:
return 0, nil
case <-f.cancelCh:
return 0, io.EOF
}
}

// Copy the data out of the frame and update our offset
Expand All @@ -351,6 +442,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) {

// Close cancels the stream of frames
func (f *FrameReader) Close() error {
f.closedLock.Lock()
defer f.closedLock.Unlock()
if f.closed {
return nil
}
Expand Down
37 changes: 37 additions & 0 deletions api/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"reflect"
"testing"
"time"
)

func TestFS_FrameReader(t *testing.T) {
Expand Down Expand Up @@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) {
t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected))
}
}

func TestFS_FrameReader_Unblock(t *testing.T) {
// Create a channel of the frames and a cancel channel
framesCh := make(chan *StreamFrame, 3)
cancelCh := make(chan struct{})

r := NewFrameReader(framesCh, cancelCh)
r.SetUnblockTime(10 * time.Millisecond)

// Read a little
p := make([]byte, 12)

n, err := r.Read(p)
if err != nil {
t.Fatalf("Read failed: %v", err)
}

if n != 0 {
t.Fatalf("should have unblocked")
}

// Unset the unblock
r.SetUnblockTime(0)

resultCh := make(chan struct{})
go func() {
r.Read(p)
close(resultCh)
}()

select {
case <-resultCh:
t.Fatalf("shouldn't have unblocked")
case <-time.After(300 * time.Millisecond):
}
}
11 changes: 8 additions & 3 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type AllocDirFS interface {
List(path string) ([]*AllocFileInfo, error)
Stat(path string) (*AllocFileInfo, error)
ReadAt(path string, offset int64) (io.ReadCloser, error)
BlockUntilExists(path string, t *tomb.Tomb) error
BlockUntilExists(path string, t *tomb.Tomb) chan error
ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error)
}

Expand Down Expand Up @@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {

// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error {
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error {
// Get the path relative to the alloc directory
p := filepath.Join(d.AllocDir, path)
watcher := getFileWatcher(p)
return watcher.BlockUntilExists(t)
returnCh := make(chan error, 1)
go func() {
returnCh <- watcher.BlockUntilExists(t)
close(returnCh)
}()
return returnCh
}

// ChangeEvents watches for changes to the passed path relative to the
Expand Down
Loading