diff --git a/api/fs.go b/api/fs.go index 36458c694d7c..dfb6b0f43ac5 100644 --- a/api/fs.go +++ b/api/fs.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "time" ) @@ -204,6 +205,7 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error { // * path: path to file to stream. // * offset: The offset to start streaming data at. // * origin: Either "start" or "end" and defines from where the offset is applied. +// * cancel: A channel that when closed, streaming will end. // // The return value is a channel that will emit StreamFrames as they are read. func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, @@ -275,19 +277,101 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, return frames, nil, nil } +// Logs streams the content of a tasks logs blocking on EOF. +// The parameters are: +// * allocation: the allocation to stream from. +// * follow: Whether the logs should be followed. +// * task: the tasks name to stream logs for. +// * logType: Either "stdout" or "stderr" +// * origin: Either "start" or "end" and defines from where the offset is applied. +// * offset: The offset to start streaming data at. +// * cancel: A channel that when closed, streaming will end. +// +// The return value is a channel that will emit StreamFrames as they are read. +func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, + offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) { + + node, _, err := a.client.Nodes().Info(alloc.NodeID, q) + if err != nil { + return nil, nil, err + } + + if node.HTTPAddr == "" { + return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID) + } + u := &url.URL{ + Scheme: "http", + Host: node.HTTPAddr, + Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), + } + v := url.Values{} + v.Set("follow", strconv.FormatBool(follow)) + v.Set("task", task) + v.Set("type", logType) + v.Set("origin", origin) + v.Set("offset", strconv.FormatInt(offset, 10)) + u.RawQuery = v.Encode() + req := &http.Request{ + Method: "GET", + URL: u, + Cancel: cancel, + } + c := http.Client{} + resp, err := c.Do(req) + if err != nil { + return nil, nil, err + } + + // Create the output channel + frames := make(chan *StreamFrame, 10) + + go func() { + // Close the body + defer resp.Body.Close() + + // Create a decoder + dec := json.NewDecoder(resp.Body) + + for { + // Check if we have been cancelled + select { + case <-cancel: + return + default: + } + + // Decode the next frame + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + close(frames) + return + } + + // Discard heartbeat frames + if frame.IsHeartbeat() { + continue + } + + frames <- &frame + } + }() + + return frames, nil, nil +} + // FrameReader is used to convert a stream of frames into a read closer. type FrameReader struct { frames <-chan *StreamFrame cancelCh chan struct{} - closed bool + + closedLock sync.Mutex + closed bool + + unblockTime time.Duration frame *StreamFrame frameOffset int - // To handle printing the file events - fileEventOffset int - fileEvent []byte - byteOffset int } @@ -300,6 +384,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe } } +// SetUnblockTime sets the time to unblock and return zero bytes read. If the +// duration is unset or is zero or less, the read will block til data is read. +func (f *FrameReader) SetUnblockTime(d time.Duration) { + f.unblockTime = d +} + // Offset returns the offset into the stream. func (f *FrameReader) Offset() int { return f.byteOffset @@ -308,32 +398,33 @@ func (f *FrameReader) Offset() int { // Read reads the data of the incoming frames into the bytes buffer. Returns EOF // when there are no more frames. func (f *FrameReader) Read(p []byte) (n int, err error) { - if f.frame == nil { - frame, ok := <-f.frames - if !ok { - return 0, io.EOF - } - f.frame = frame - - // Store the total offset into the file - f.byteOffset = int(f.frame.Offset) + f.closedLock.Lock() + closed := f.closed + f.closedLock.Unlock() + if closed { + return 0, io.EOF } - if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { - f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent)) - f.fileEventOffset = 0 - } + if f.frame == nil { + var unblock <-chan time.Time + if f.unblockTime.Nanoseconds() > 0 { + unblock = time.After(f.unblockTime) + } - // If there is a file event we inject it into the read stream - if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset { - n = copy(p, f.fileEvent[f.fileEventOffset:]) - f.fileEventOffset += n - return n, nil - } + select { + case frame, ok := <-f.frames: + if !ok { + return 0, io.EOF + } + f.frame = frame - if len(f.fileEvent) == f.fileEventOffset { - f.fileEvent = nil - f.fileEventOffset = 0 + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) + case <-unblock: + return 0, nil + case <-f.cancelCh: + return 0, io.EOF + } } // Copy the data out of the frame and update our offset @@ -351,6 +442,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { // Close cancels the stream of frames func (f *FrameReader) Close() error { + f.closedLock.Lock() + defer f.closedLock.Unlock() if f.closed { return nil } diff --git a/api/fs_test.go b/api/fs_test.go index 25107d9c88ed..bea8d5984420 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -4,6 +4,7 @@ import ( "io" "reflect" "testing" + "time" ) func TestFS_FrameReader(t *testing.T) { @@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) { t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected)) } } + +func TestFS_FrameReader_Unblock(t *testing.T) { + // Create a channel of the frames and a cancel channel + framesCh := make(chan *StreamFrame, 3) + cancelCh := make(chan struct{}) + + r := NewFrameReader(framesCh, cancelCh) + r.SetUnblockTime(10 * time.Millisecond) + + // Read a little + p := make([]byte, 12) + + n, err := r.Read(p) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + + if n != 0 { + t.Fatalf("should have unblocked") + } + + // Unset the unblock + r.SetUnblockTime(0) + + resultCh := make(chan struct{}) + go func() { + r.Read(p) + close(resultCh) + }() + + select { + case <-resultCh: + t.Fatalf("shouldn't have unblocked") + case <-time.After(300 * time.Millisecond): + } +} diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 9f6c44c923b0..514f05cfbd6a 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -61,7 +61,7 @@ type AllocDirFS interface { List(path string) ([]*AllocFileInfo, error) Stat(path string) (*AllocFileInfo, error) ReadAt(path string, offset int64) (io.ReadCloser, error) - BlockUntilExists(path string, t *tomb.Tomb) error + BlockUntilExists(path string, t *tomb.Tomb) chan error ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) } @@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) { // BlockUntilExists blocks until the passed file relative the allocation // directory exists. The block can be cancelled with the passed tomb. -func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error { +func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error { // Get the path relative to the alloc directory p := filepath.Join(d.AllocDir, path) watcher := getFileWatcher(p) - return watcher.BlockUntilExists(t) + returnCh := make(chan error, 1) + go func() { + returnCh <- watcher.BlockUntilExists(t) + close(returnCh) + }() + return returnCh } // ChangeEvents watches for changes to the passed path relative to the diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 6375f4bd5971..3a1ca361a972 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -4,10 +4,16 @@ import ( "bytes" "fmt" "io" + "math" + "net" "net/http" + "os" + "path/filepath" + "sort" "strconv" "strings" "sync" + "syscall" "time" "gopkg.in/tomb.v1" @@ -21,6 +27,8 @@ import ( var ( allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id") fileNameNotPresentErr = fmt.Errorf("must provide a file name") + taskNotPresentErr = fmt.Errorf("must provide task name") + logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)") clientNotRunning = fmt.Errorf("node is not running a Nomad Client") invalidOrigin = fmt.Errorf("origin must be start or end") ) @@ -31,14 +39,28 @@ const ( // streamHeartbeatRate is the rate at which a heartbeat will occur to detect // a closed connection without sending any additional data - streamHeartbeatRate = 10 * time.Second + streamHeartbeatRate = 1 * time.Second // streamBatchWindow is the window in which file content is batched before // being flushed if the frame size has not been hit. streamBatchWindow = 200 * time.Millisecond + // nextLogCheckRate is the rate at which we check for a log entry greater + // than what we are watching for. This is to handle the case in which logs + // rotate faster than we can detect and we have to rely on a normal + // directory listing. + nextLogCheckRate = 100 * time.Millisecond + + // deleteEvent and truncateEvent are the file events that can be sent in a + // StreamFrame deleteEvent = "file deleted" truncateEvent = "file truncated" + + // OriginStart and OriginEnd are the available parameters for the origin + // argument when streaming a file. They respectively offset from the start + // and end of a file. + OriginStart = "start" + OriginEnd = "end" ) func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -58,6 +80,8 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int return s.FileCatRequest(resp, req) case strings.HasPrefix(path, "stream/"): return s.Stream(resp, req) + case strings.HasPrefix(path, "logs/"): + return s.Logs(resp, req) default: return nil, CodedError(404, ErrInvalidMethod) } @@ -223,7 +247,7 @@ type StreamFramer struct { // Captures whether the framer is running and any error that occurred to // cause it to stop. running bool - err error + Err error } // NewStreamFramer creates a new stream framer that will output StreamFrames to @@ -252,18 +276,16 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio // Destroy is used to cleanup the StreamFramer and flush any pending frames func (s *StreamFramer) Destroy() { s.l.Lock() - wasRunning := s.running - s.running = false - s.f = nil close(s.shutdownCh) s.heartbeat.Stop() s.flusher.Stop() s.l.Unlock() // Ensure things were flushed - if wasRunning { + if s.running { <-s.exitCh } + s.out.Close() } // Run starts a long lived goroutine that handles sending data as well as @@ -290,11 +312,12 @@ func (s *StreamFramer) run() { // Store any error and mark it as not running var err error defer func() { - s.l.Lock() - s.err = err - s.out.Close() close(s.exitCh) + + s.l.Lock() close(s.outbound) + s.Err = err + s.running = false s.l.Unlock() }() @@ -303,6 +326,8 @@ func (s *StreamFramer) run() { go func() { for { select { + case <-s.exitCh: + return case <-s.shutdownCh: return case <-s.flusher.C: @@ -315,13 +340,18 @@ func (s *StreamFramer) run() { // Read the data for the frame, and send it s.f.Data = s.readData() - s.outbound <- s.f - s.f = nil - + select { + case s.outbound <- s.f: + s.f = nil + case <-s.exitCh: + } s.l.Unlock() case <-s.heartbeat.C: // Send a heartbeat frame - s.outbound <- &StreamFrame{} + select { + case s.outbound <- &StreamFrame{}: + default: + } } } }() @@ -332,7 +362,7 @@ OUTER: case <-s.shutdownCh: break OUTER case o := <-s.outbound: - // Send the frame and then clear the current working frame + // Send the frame if err = s.enc.Encode(o); err != nil { return } @@ -340,21 +370,25 @@ OUTER: } // Flush any existing frames - s.l.Lock() - defer s.l.Unlock() - select { - case o := <-s.outbound: - // Send the frame and then clear the current working frame - if err = s.enc.Encode(o); err != nil { - return +FLUSH: + for { + select { + case o := <-s.outbound: + // Send the frame and then clear the current working frame + if err = s.enc.Encode(o); err != nil { + return + } + default: + break FLUSH } - default: } + s.l.Lock() if s.f != nil { s.f.Data = s.readData() s.enc.Encode(s.f) } + s.l.Unlock() } // readData is a helper which reads the buffered data returning up to the frame @@ -369,7 +403,10 @@ func (s *StreamFramer) readData() []byte { if size == 0 { return nil } - return s.data.Next(size) + d := s.data.Next(size) + b := make([]byte, size) + copy(b, d) + return b } // Send creates and sends a StreamFrame based on the passed parameters. An error @@ -382,8 +419,8 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // If we are not running, return the error that caused us to not run or // indicated that it was never started. if !s.running { - if s.err != nil { - return s.err + if s.Err != nil { + return s.Err } return fmt.Errorf("StreamFramer not running") } @@ -391,13 +428,14 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // Check if not mergeable if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame - s.outbound <- &StreamFrame{ - Offset: s.f.Offset, - File: s.f.File, - FileEvent: s.f.FileEvent, - Data: s.readData(), + f := *s.f + f.Data = s.readData() + select { + case <-s.exitCh: + return nil + case s.outbound <- &f: + s.f = nil } - s.f = nil } // Store the new data as the current frame. @@ -414,21 +452,30 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // Handle the delete case in which there is no data if s.data.Len() == 0 && s.f.FileEvent != "" { - s.outbound <- &StreamFrame{ + select { + case <-s.exitCh: + return nil + case s.outbound <- &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, + }: } } // Flush till we are under the max frame size for s.data.Len() >= s.frameSize { // Create a new frame to send it - s.outbound <- &StreamFrame{ + d := s.readData() + select { + case <-s.exitCh: + return nil + case s.outbound <- &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, - Data: s.readData(), + Data: d, + }: } } @@ -499,10 +546,26 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.stream(offset, path, fs, output) + // Create the framer + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + err = s.stream(offset, path, fs, framer, nil) + if err != nil && err != syscall.EPIPE { + return nil, err + } + + return nil, nil } -func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error { +// stream is the internal method to stream the content of a file. eofCancelCh is +// used to cancel the stream if triggered while at EOF. If the connection is +// broken an EPIPE error is returned +func (s *HTTPServer) stream(offset int64, path string, + fs allocdir.AllocDirFS, framer *StreamFramer, + eofCancelCh chan error) error { + // Get the reader f, err := fs.ReadAt(path, offset) if err != nil { @@ -517,11 +580,6 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o t.Done() }() - // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() - // Create a variable to allow setting the last event var lastEvent string @@ -547,6 +605,22 @@ OUTER: // Send the frame if n != 0 { if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { + + // Check if the connection has been closed + if err == io.ErrClosedPipe { + // The pipe check is for tests + return syscall.EPIPE + } + + operr, ok := err.(*net.OpError) + if ok { + // The connection was closed by our peer + e := operr.Err.Error() + if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) { + return syscall.EPIPE + } + } + return err } } @@ -595,9 +669,340 @@ OUTER: continue OUTER case <-framer.ExitCh(): return nil + case err, ok := <-eofCancelCh: + if !ok { + return nil + } + + return err } } } return nil } + +// Logs streams the content of a log blocking on EOF. The parameters are: +// * task: task name to stream logs for. +// * type: stdout/stderr to stream. +// * follow: A boolean of whether to follow the logs. +// * offset: The offset to start streaming data at, defaults to zero. +// * origin: Either "start" or "end" and defines from where the offset is +// applied. Defaults to "start". +func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var allocID, task, logType string + var follow bool + var err error + + q := req.URL.Query() + + if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" { + return nil, allocIDNotPresentErr + } + + if task = q.Get("task"); task == "" { + return nil, taskNotPresentErr + } + + if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { + return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + } + + logType = q.Get("type") + switch logType { + case "stdout", "stderr": + default: + return nil, logTypeNotPresentErr + } + + var offset int64 + offsetString := q.Get("offset") + if offsetString != "" { + var err error + if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil { + return nil, fmt.Errorf("error parsing offset: %v", err) + } + } + + origin := q.Get("origin") + switch origin { + case "start", "end": + case "": + origin = "start" + default: + return nil, invalidOrigin + } + + fs, err := s.agent.client.GetAllocFS(allocID) + if err != nil { + return nil, err + } + + // Create an output that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + return nil, s.logs(follow, offset, origin, task, logType, fs, output) +} + +func (s *HTTPServer) logs(follow bool, offset int64, + origin, task, logType string, + fs allocdir.AllocDirFS, output io.WriteCloser) error { + + // Create the framer + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + // Path to the logs + logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName) + + // nextIdx is the next index to read logs from + var nextIdx int64 + switch origin { + case "start": + nextIdx = 0 + case "end": + nextIdx = math.MaxInt64 + offset *= -1 + default: + return invalidOrigin + } + + // Create a tomb to cancel watch events + t := tomb.Tomb{} + defer func() { + t.Kill(nil) + t.Done() + }() + + for { + // Logic for picking next file is: + // 1) List log files + // 2) Pick log file closest to desired index + // 3) Open log file at correct offset + // 3a) No error, read contents + // 3b) If file doesn't exist, goto 1 as it may have been rotated out + entries, err := fs.List(logPath) + if err != nil { + return fmt.Errorf("failed to list entries: %v", err) + } + + // If we are not following logs, determine the max index for the logs we are + // interested in so we can stop there. + maxIndex := int64(math.MaxInt64) + if !follow { + _, idx, _, err := findClosest(entries, maxIndex, 0, task, logType) + if err != nil { + return err + } + maxIndex = idx + } + + logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType) + if err != nil { + return err + } + + var eofCancelCh chan error + exitAfter := false + if !follow && idx > maxIndex { + // Exceeded what was there initially so return + return nil + } else if !follow && idx == maxIndex { + // At the end + eofCancelCh = make(chan error) + close(eofCancelCh) + exitAfter = true + } else { + eofCancelCh = blockUntilNextLog(fs, &t, logPath, task, logType, idx+1) + } + + p := filepath.Join(logPath, logEntry.Name) + err = s.stream(openOffset, p, fs, framer, eofCancelCh) + + if err != nil { + // Check if there was an error where the file does not exist. That means + // it got rotated out from under us. + if os.IsNotExist(err) { + continue + } + + // Check if the connection was closed + if err == syscall.EPIPE { + return nil + } + + return fmt.Errorf("failed to stream %q: %v", p, err) + } + + if exitAfter { + return nil + } + + //Since we successfully streamed, update the overall offset/idx. + offset = int64(0) + nextIdx = idx + 1 + } + + return nil +} + +// blockUntilNextLog returns a channel that will have data sent when the next +// log index or anything greater is created. +func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logType string, nextIndex int64) chan error { + nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex)) + next := make(chan error, 1) + + go func() { + eofCancelCh := fs.BlockUntilExists(nextPath, t) + scanCh := time.Tick(nextLogCheckRate) + for { + select { + case err := <-eofCancelCh: + next <- err + close(next) + return + case <-scanCh: + entries, err := fs.List(logPath) + if err != nil { + next <- fmt.Errorf("failed to list entries: %v", err) + close(next) + return + } + + indexes, err := logIndexes(entries, task, logType) + if err != nil { + next <- err + close(next) + return + } + + // Scan and see if there are any entries larger than what we are + // waiting for. + for _, entry := range indexes { + if entry.idx >= nextIndex { + next <- nil + close(next) + return + } + } + } + } + }() + + return next +} + +// indexTuple and indexTupleArray are used to find the correct log entry to +// start streaming logs from +type indexTuple struct { + idx int64 + entry *allocdir.AllocFileInfo +} + +type indexTupleArray []indexTuple + +func (a indexTupleArray) Len() int { return len(a) } +func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx } +func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// logIndexes takes a set of entries and returns a indexTupleArray of +// the desired log file entries. If the indexes could not be determined, an +// error is returned. +func logIndexes(entries []*allocdir.AllocFileInfo, task, logType string) (indexTupleArray, error) { + var indexes []indexTuple + prefix := fmt.Sprintf("%s.%s.", task, logType) + for _, entry := range entries { + if entry.IsDir { + continue + } + + // If nothing was trimmed, then it is not a match + idxStr := strings.TrimPrefix(entry.Name, prefix) + if idxStr == entry.Name { + continue + } + + // Convert to an int + idx, err := strconv.Atoi(idxStr) + if err != nil { + return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) + } + + indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry}) + } + + return indexTupleArray(indexes), nil +} + +// findClosest takes a list of entries, the desired log index and desired log +// offset (which can be negative, treated as offset from end), task name and log +// type and returns the log entry, the log index, the offset to read from and a +// potential error. +func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64, + task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) { + + // Build the matching indexes + indexes, err := logIndexes(entries, task, logType) + if err != nil { + return nil, 0, 0, err + } + if len(indexes) == 0 { + return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) + } + + // Binary search the indexes to get the desiredIdx + sort.Sort(indexTupleArray(indexes)) + i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx }) + l := len(indexes) + if i == l { + // Use the last index if the number is bigger than all of them. + i = l - 1 + } + + // Get to the correct offset + offset := desiredOffset + idx := int64(i) + for { + s := indexes[idx].entry.Size + + // Base case + if offset == 0 { + break + } else if offset < 0 { + // Going backwards + if newOffset := s + offset; newOffset >= 0 { + // Current file works + offset = newOffset + break + } else if idx == 0 { + // Already at the end + offset = 0 + break + } else { + // Try the file before + offset = newOffset + idx -= 1 + continue + } + } else { + // Going forward + if offset <= s { + // Current file works + break + } else if idx == int64(l-1) { + // Already at the end + offset = s + break + } else { + // Try the next file + offset = offset - s + idx += 1 + continue + } + + } + } + + return indexes[idx].entry, indexes[idx].idx, offset, nil +} diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index aaf1f8b4dfb6..2659bb88cef2 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -1,18 +1,24 @@ package agent import ( + "bytes" + "fmt" "io" "io/ioutil" + "math" "net/http" "net/http/httptest" "os" "path/filepath" "reflect" + "strconv" + "strings" "testing" "time" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/testutil" + "github.com/pmezard/go-difflib/difflib" "github.com/ugorji/go/codec" ) @@ -302,6 +308,106 @@ func TestStreamFramer_Heartbeat(t *testing.T) { } } +// This test checks that frames are received in order +func TestStreamFramer_Order(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 10) + sf.Run() + + // Create a decoder + dec := codec.NewDecoder(r, jsonHandle) + + files := []string{"1", "2", "3", "4", "5"} + input := bytes.NewBuffer(make([]byte, 0, 100000)) + for i := 0; i <= 1000; i++ { + str := strconv.Itoa(i) + "," + input.WriteString(str) + } + + expected := bytes.NewBuffer(make([]byte, 0, 100000)) + for _, _ = range files { + expected.Write(input.Bytes()) + } + receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000)) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode") + } + + if frame.IsHeartbeat() { + continue + } + + receivedBuf.Write(frame.Data) + + if reflect.DeepEqual(expected, receivedBuf) { + resultCh <- struct{}{} + return + } + } + }() + + // Send the data + b := input.Bytes() + shards := 10 + each := len(b) / shards + for _, f := range files { + for i := 0; i < shards; i++ { + l, r := each*i, each*(i+1) + if i == shards-1 { + r = len(b) + } + + if err := sf.Send(f, "", b[l:r], 0); err != nil { + t.Fatalf("Send() failed %v", err) + } + } + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(10 * bWindow): + got := receivedBuf.String() + want := expected.String() + diff := difflib.ContextDiff{ + A: difflib.SplitLines(strings.Replace(got, ",", "\n", -1)), + B: difflib.SplitLines(strings.Replace(want, ",", "\n", -1)), + FromFile: "Got", + ToFile: "Want", + Context: 3, + Eol: "\n", + } + result, _ := difflib.GetContextDiffString(diff) + t.Fatalf(strings.Replace(result, "\t", " ", -1)) + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + select { + case <-sf.ExitCh(): + case <-time.After(2 * hRate): + t.Fatalf("exit channel should close") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + func TestHTTP_Stream_MissingParams(t *testing.T) { httpTest(t, nil, func(s *TestServer) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) @@ -347,6 +453,10 @@ func tempAllocDir(t *testing.T) *allocdir.AllocDir { t.Fatalf("TempDir() failed: %v", err) } + if err := os.Chmod(dir, 0777); err != nil { + t.Fatalf("failed to chmod dir: %v", err) + } + return allocdir.NewAllocDir(dir) } @@ -364,7 +474,11 @@ func TestHTTP_Stream_NoFile(t *testing.T) { ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) - if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil { + framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + if err := s.Server.stream(0, "foo", ad, framer, nil); err == nil { t.Fatalf("expected an error when streaming unknown file") } }) @@ -419,9 +533,13 @@ func TestHTTP_Stream_Modify(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, w); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -496,9 +614,13 @@ func TestHTTP_Stream_Truncate(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, w); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -595,9 +717,12 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -615,6 +740,88 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("did not receive delete") } + framer.Destroy() + testutil.WaitForResult(func() (bool, error) { + return wrappedW.Closed, nil + }, func(err error) { + t.Fatalf("connection not closed") + }) + + }) +} + +func TestHTTP_Logs_NoFollow(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := []byte("012") + for i := 0; i < 3; i++ { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, i) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + + // Create a decoder + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + var received []byte + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + if err == io.EOF { + t.Logf("EOF") + return + } + + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if reflect.DeepEqual(received, expected) { + close(resultCh) + return + } + } + }() + + // Start streaming logs + go func() { + if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + t.Fatalf("logs() failed: %v", err) + } + }() + + select { + case <-resultCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + testutil.WaitForResult(func() (bool, error) { return wrappedW.Closed, nil }, func(err error) { @@ -623,3 +830,311 @@ func TestHTTP_Stream_Delete(t *testing.T) { }) } + +func TestHTTP_Logs_Follow(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := []byte("012345") + initialWrites := 3 + + writeToFile := func(index int, data []byte) { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, data, 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + for i := 0; i < initialWrites; i++ { + writeToFile(i, expected[i:i+1]) + } + + // Create a decoder + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + var received []byte + + // Start the reader + firstResultCh := make(chan struct{}) + fullResultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + if err == io.EOF { + t.Logf("EOF") + return + } + + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if reflect.DeepEqual(received, expected[:initialWrites]) { + close(firstResultCh) + } else if reflect.DeepEqual(received, expected) { + close(fullResultCh) + return + } + } + }() + + // Start streaming logs + go func() { + if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + t.Fatalf("logs() failed: %v", err) + } + }() + + select { + case <-firstResultCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + + // We got the first chunk of data, write out the rest to the next file + // at an index much ahead to check that it is following and detecting + // skips + skipTo := initialWrites + 10 + writeToFile(skipTo, expected[initialWrites:]) + + select { + case <-fullResultCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + + // Close the reader + r.Close() + + testutil.WaitForResult(func() (bool, error) { + return wrappedW.Closed, nil + }, func(err error) { + t.Fatalf("connection not closed") + }) + }) +} + +func TestLogs_findClosest(t *testing.T) { + task := "foo" + entries := []*allocdir.AllocFileInfo{ + { + Name: "foo.stdout.0", + Size: 100, + }, + { + Name: "foo.stdout.1", + Size: 100, + }, + { + Name: "foo.stdout.2", + Size: 100, + }, + { + Name: "foo.stdout.3", + Size: 100, + }, + { + Name: "foo.stderr.0", + Size: 100, + }, + { + Name: "foo.stderr.1", + Size: 100, + }, + { + Name: "foo.stderr.2", + Size: 100, + }, + } + + cases := []struct { + Entries []*allocdir.AllocFileInfo + DesiredIdx int64 + DesiredOffset int64 + Task string + LogType string + ExpectedFile string + ExpectedIdx int64 + ExpectedOffset int64 + Error bool + }{ + // Test error cases + { + Entries: nil, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + Error: true, + }, + { + Entries: entries[0:3], + DesiredIdx: 0, + Task: task, + LogType: "stderr", + Error: true, + }, + + // Test begining cases + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 0, + DesiredOffset: -100, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 1, + DesiredOffset: -1000, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stderr", + ExpectedFile: entries[4].Name, + ExpectedIdx: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + + // Test middle cases + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 110, + Task: task, + LogType: "stdout", + ExpectedFile: entries[2].Name, + ExpectedIdx: 2, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stderr", + ExpectedFile: entries[5].Name, + ExpectedIdx: 1, + }, + // Test end cases + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 100, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: -10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 90, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stderr", + ExpectedFile: entries[6].Name, + ExpectedIdx: 2, + }, + } + + for i, c := range cases { + entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType) + if err != nil { + if !c.Error { + t.Fatalf("case %d: Unexpected error: %v", i, err) + } + continue + } + + if entry.Name != c.ExpectedFile { + t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile) + } + if idx != c.ExpectedIdx { + t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx) + } + if offset != c.ExpectedOffset { + t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset) + } + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 5d6c42aa00e8..197effe481ca 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -60,7 +60,7 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ agent: agent, mux: mux, listener: ln, - logger: log.New(logOutput, "", log.LstdFlags), + logger: agent.logger, addr: ln.Addr().String(), } srv.registerHandlers(config.EnableDebug) diff --git a/command/fs.go b/command/fs.go index e2f99bc0e46d..df1bbf91d08f 100644 --- a/command/fs.go +++ b/command/fs.go @@ -50,7 +50,7 @@ FS Specific Options: Show full information. -job - Use a random allocation from a specified job-id. + Use a random allocation from the specified job ID. -stat Show file stat information instead of displaying the file, or listing the directory. @@ -60,12 +60,12 @@ FS Specific Options: rather to wait for additional output. -tail - Show the files contents with offsets relative to the end of the file. If no - offset is given, -n is defaulted to 10. + Show the files contents with offsets relative to the end of the file. If no + offset is given, -n is defaulted to 10. -n - Sets the tail location in best-efforted number of lines relative to the end - of the file. + Sets the tail location in best-efforted number of lines relative to the end + of the file. -c Sets the tail location in number of bytes relative to the end of the file. @@ -81,7 +81,7 @@ func (f *FSCommand) Run(args []string) int { var verbose, machine, job, stat, tail, follow bool var numLines, numBytes int64 - flags := f.Meta.FlagSet("fs-list", FlagSetClient) + flags := f.Meta.FlagSet("fs", FlagSetClient) flags.Usage = func() { f.Ui.Output(f.Help()) } flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&machine, "H", false, "") @@ -177,14 +177,6 @@ func (f *FSCommand) Run(args []string) int { return 1 } - if alloc.DesiredStatus == "failed" { - allocID := limit(alloc.ID, length) - msg := fmt.Sprintf(`The allocation %q failed to be placed. To see the cause, run: -nomad alloc-status %s`, allocID, allocID) - f.Ui.Error(msg) - return 0 - } - // Get file stat info file, _, err := client.AllocFS().Stat(alloc, path, nil) if err != nil { @@ -290,7 +282,7 @@ nomad alloc-status %s`, allocID, allocID) // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) } } @@ -329,7 +321,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) } go func() { @@ -337,9 +329,6 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // End the streaming r.Close() - - // Output the last offset - f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() return r, nil diff --git a/command/helpers.go b/command/helpers.go index 36f3455cc453..b9c4a6be7af2 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -105,17 +105,26 @@ type LineLimitReader struct { lines int searchLimit int + timeLimit time.Duration + lastRead time.Time + buffer *bytes.Buffer bufFiled bool foundLines bool } // NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find -// searching backwards in the first searchLimit bytes. -func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader { +// searching backwards in the first searchLimit bytes. timeLimit can optionally +// be specified by passing a non-zero duration. When set, the search for the +// last n lines is aborted if no data has been read in the duration. This +// can be used to flush what is had if no extra data is being received. When +// used, the underlying reader must not block forever and must periodically +// unblock even when no data has been read. +func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int, timeLimit time.Duration) *LineLimitReader { return &LineLimitReader{ ReadCloser: r, searchLimit: searchLimit, + timeLimit: timeLimit, lines: lines, buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)), } @@ -124,14 +133,52 @@ func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReade func (l *LineLimitReader) Read(p []byte) (n int, err error) { // Fill up the buffer so we can find the correct number of lines. if !l.bufFiled { - _, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit))) + b := make([]byte, len(p)) + n, err := l.ReadCloser.Read(b) + if n > 0 { + if _, err := l.buffer.Write(b[:n]); err != nil { + return 0, err + } + } + if err != nil { - return 0, err + if err != io.EOF { + return 0, err + } + + l.bufFiled = true + goto READ + } + + if l.buffer.Len() >= l.searchLimit { + l.bufFiled = true + goto READ + } + + if l.timeLimit.Nanoseconds() > 0 { + if l.lastRead.IsZero() { + l.lastRead = time.Now() + return 0, nil + } + + now := time.Now() + if n == 0 { + // We hit the limit + if l.lastRead.Add(l.timeLimit).Before(now) { + l.bufFiled = true + goto READ + } else { + return 0, nil + } + } else { + l.lastRead = now + } } - l.bufFiled = true + return 0, nil } +READ: if l.bufFiled && l.buffer.Len() != 0 { b := l.buffer.Bytes() diff --git a/command/helpers_test.go b/command/helpers_test.go index c2dd64e821c6..f43e701e7749 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -1,9 +1,12 @@ package command import ( + "io" "io/ioutil" + "reflect" "strings" "testing" + "time" "github.com/mitchellh/cli" ) @@ -48,7 +51,7 @@ func TestHelpers_NodeID(t *testing.T) { } } -func TestHelpers_LineLimitReader(t *testing.T) { +func TestHelpers_LineLimitReader_NoTimeLimit(t *testing.T) { helloString := `hello world this @@ -114,7 +117,7 @@ test`, for i, c := range cases { in := ioutil.NopCloser(strings.NewReader(c.Input)) - limit := NewLineLimitReader(in, c.Lines, c.SearchLimit) + limit := NewLineLimitReader(in, c.Lines, c.SearchLimit, 0) outBytes, err := ioutil.ReadAll(limit) if err != nil { t.Fatalf("case %d failed: %v", i, err) @@ -126,3 +129,59 @@ test`, } } } + +type testReadCloser struct { + data chan []byte +} + +func (t *testReadCloser) Read(p []byte) (n int, err error) { + select { + case b, ok := <-t.data: + if !ok { + return 0, io.EOF + } + + return copy(p, b), nil + case <-time.After(10 * time.Millisecond): + return 0, nil + } +} + +func (t *testReadCloser) Close() error { + close(t.data) + return nil +} + +func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) { + // Create the test reader + in := &testReadCloser{data: make(chan []byte)} + + // Set up the reader such that it won't hit the line/buffer limit and could + // only terminate if it hits the time limit + limit := NewLineLimitReader(in, 1000, 1000, 100*time.Millisecond) + + expected := []byte("hello world") + + resultCh := make(chan struct{}) + go func() { + outBytes, err := ioutil.ReadAll(limit) + if err != nil { + t.Fatalf("ReadAll failed: %v", err) + } + + if reflect.DeepEqual(outBytes, expected) { + close(resultCh) + return + } + }() + + // Send the data + in.data <- expected + in.Close() + + select { + case <-resultCh: + case <-time.After(1 * time.Second): + t.Fatalf("did not exit by time limit") + } +} diff --git a/command/logs.go b/command/logs.go new file mode 100644 index 000000000000..6f105202c0f5 --- /dev/null +++ b/command/logs.go @@ -0,0 +1,268 @@ +package command + +import ( + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/hashicorp/nomad/api" +) + +type LogsCommand struct { + Meta +} + +func (l *LogsCommand) Help() string { + helpText := ` +Usage: nomad logs [options] + + Streams the stdout/stderr of the given allocation and task. + +General Options: + + ` + generalOptionsUsage() + ` + +Logs Specific Options: + + -verbose + Show full information. + + -job + Use a random allocation from the specified job ID. + + -f + Causes the output to not stop when the end of the logs are reached, but + rather to wait for additional output. + + -tail + Show the files contents with offsets relative to the end of the file. If no + offset is given, -n is defaulted to 10. + + -n + Sets the tail location in best-efforted number of lines relative to the end + of the file. + + -c + Sets the tail location in number of bytes relative to the end of the file. + ` + return strings.TrimSpace(helpText) +} + +func (l *LogsCommand) Synopsis() string { + return "Streams the logs of a task." +} + +func (l *LogsCommand) Run(args []string) int { + var verbose, job, tail, stderr, follow bool + var numLines, numBytes int64 + + flags := l.Meta.FlagSet("logs", FlagSetClient) + flags.Usage = func() { l.Ui.Output(l.Help()) } + flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&job, "job", false, "") + flags.BoolVar(&tail, "tail", false, "") + flags.BoolVar(&follow, "f", false, "") + flags.BoolVar(&stderr, "stderr", false, "") + flags.Int64Var(&numLines, "n", -1, "") + flags.Int64Var(&numBytes, "c", -1, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + args = flags.Args() + + if numArgs := len(args); numArgs < 1 { + if job { + l.Ui.Error("Job ID required. See help:\n") + } else { + l.Ui.Error("Allocation ID required. See help:\n") + } + + l.Ui.Error(l.Help()) + return 1 + } else if numArgs > 2 { + l.Ui.Error(l.Help()) + return 1 + } + + client, err := l.Meta.Client() + if err != nil { + l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) + return 1 + } + + // If -job is specified, use random allocation, otherwise use provided allocation + allocID := args[0] + if job { + allocID, err = getRandomJobAlloc(client, args[0]) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) + return 1 + } + } + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + // Query the allocation info + if len(allocID) == 1 { + l.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + l.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) + } + l.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + var task string + if len(args) >= 2 { + task = args[1] + if task == "" { + l.Ui.Error("Task name required") + return 1 + } + + } else { + // Try to determine the tasks name from the allocation + var tasks []*api.Task + for _, tg := range alloc.Job.TaskGroups { + if tg.Name == alloc.TaskGroup { + if len(tg.Tasks) == 1 { + task = tg.Tasks[0].Name + break + } + + tasks = tg.Tasks + break + } + } + + if task == "" { + l.Ui.Error(fmt.Sprintf("Allocation %q is running the following tasks:", limit(alloc.ID, length))) + for _, t := range tasks { + l.Ui.Error(fmt.Sprintf(" * %s", t.Name)) + } + l.Ui.Error("\nPlease specify the task.") + return 1 + } + } + + logType := "stdout" + if stderr { + logType = "stderr" + } + + // We have a file, output it. + var r io.ReadCloser + var readErr error + if !tail { + r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0) + if readErr != nil { + readErr = fmt.Errorf("Error reading file: %v", readErr) + } + } else { + // Parse the offset + var offset int64 = defaultTailLines * bytesToLines + + if nLines, nBytes := numLines != -1, numBytes != -1; nLines && nBytes { + l.Ui.Error("Both -n and -c set") + return 1 + } else if nLines { + offset = numLines * bytesToLines + } else if nBytes { + offset = numBytes + } else { + numLines = defaultTailLines + } + + r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset) + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) + } + + if readErr != nil { + readErr = fmt.Errorf("Error tailing file: %v", readErr) + } + } + + defer r.Close() + if readErr != nil { + l.Ui.Error(readErr.Error()) + return 1 + } + + io.Copy(os.Stdout, r) + return 0 +} + +// followFile outputs the contents of the file to stdout relative to the end of +// the file. +func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, + follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) { + + cancel := make(chan struct{}) + frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil) + if err != nil { + panic(err.Error()) + return nil, err + } + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Create a reader + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, cancel) + frameReader.SetUnblockTime(500 * time.Millisecond) + r = frameReader + + go func() { + <-signalCh + + // End the streaming + r.Close() + }() + + return r, nil +} diff --git a/command/logs_test.go b/command/logs_test.go new file mode 100644 index 000000000000..2400b6302d0b --- /dev/null +++ b/command/logs_test.go @@ -0,0 +1,65 @@ +package command + +import ( + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func TestLogsCommand_Implements(t *testing.T) { + var _ cli.Command = &LogsCommand{} +} + +func TestLogsCommand_Fails(t *testing.T) { + srv, _, url := testServer(t, nil) + defer srv.Stop() + + ui := new(cli.MockUi) + cmd := &LogsCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, cmd.Help()) { + t.Fatalf("expected help output, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Fails on connection failure + if code := cmd.Run([]string{"-address=nope", "foobar"}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error querying allocation") { + t.Fatalf("expected failed query error, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Fails on missing alloc + if code := cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}); code != 1 { + t.Fatalf("expected exit 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "No allocation(s) with prefix or id") { + t.Fatalf("expected not found error, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Fail on identifier with too few characters + if code := cmd.Run([]string{"-address=" + url, "2"}); code != 1 { + t.Fatalf("expected exit 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "must contain at least two characters.") { + t.Fatalf("expected too few characters error, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Identifiers with uneven length should produce a query result + if code := cmd.Run([]string{"-address=" + url, "123"}); code != 1 { + t.Fatalf("expected exit 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "No allocation(s) with prefix or id") { + t.Fatalf("expected not found error, got: %s", out) + } + +} diff --git a/command/plan.go b/command/plan.go index aa85aceb0742..4b404cfd48e6 100644 --- a/command/plan.go +++ b/command/plan.go @@ -63,8 +63,8 @@ General Options: Plan Options: -diff - Determines whether the diff between the remote job and planned job is shown. - Defaults to true. + Determines whether the diff between the remote job and planned job is shown. + Defaults to true. -verbose Increase diff verbosity. diff --git a/command/run.go b/command/run.go index c983750ac846..e0ed219a5a40 100644 --- a/command/run.go +++ b/command/run.go @@ -62,17 +62,17 @@ General Options: Run Options: -check-index - If set, the job is only registered or updated if the the passed - job modify index matches the server side version. If a check-index value of - zero is passed, the job is only registered if it does not yet exist. If a - non-zero value is passed, it ensures that the job is being updated from a - known state. The use of this flag is most common in conjunction with plan - command. + If set, the job is only registered or updated if the the passed + job modify index matches the server side version. If a check-index value of + zero is passed, the job is only registered if it does not yet exist. If a + non-zero value is passed, it ensures that the job is being updated from a + known state. The use of this flag is most common in conjunction with plan + command. -detach - Return immediately instead of entering monitor mode. After job submission, - the evaluation ID will be printed to the screen, which can be used to - examine the evaluation using the eval-status command. + Return immediately instead of entering monitor mode. After job submission, + the evaluation ID will be printed to the screen, which can be used to + examine the evaluation using the eval-status command. -verbose Display full information. diff --git a/command/stop.go b/command/stop.go index 3a1625397c49..1f19baae56af 100644 --- a/command/stop.go +++ b/command/stop.go @@ -27,9 +27,9 @@ Stop Options: -detach Return immediately instead of entering monitor mode. After the - deregister command is submitted, a new evaluation ID is printed to the - screen, which can be used to examine the evaluation using the eval-status - command. + deregister command is submitted, a new evaluation ID is printed to the + screen, which can be used to examine the evaluation using the eval-status + command. -yes Automatic yes to prompts. diff --git a/commands.go b/commands.go index 69d96273ffd3..acef7f16c670 100644 --- a/commands.go +++ b/commands.go @@ -79,6 +79,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "logs": func() (cli.Command, error) { + return &command.LogsCommand{ + Meta: meta, + }, nil + }, "node-drain": func() (cli.Command, error) { return &command.NodeDrainCommand{ Meta: meta,