From cc0fec53f1e6b5df709085e791bf051183dfce9a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 18 Jul 2016 09:48:29 -0700 Subject: [PATCH 01/16] initial log api impl --- client/allocdir/alloc_dir.go | 11 +- command/agent/fs_endpoint.go | 209 +++++++++++++++++++++++++++++++++-- 2 files changed, 210 insertions(+), 10 deletions(-) diff --git a/client/allocdir/alloc_dir.go b/client/allocdir/alloc_dir.go index 9f6c44c923b0..514f05cfbd6a 100644 --- a/client/allocdir/alloc_dir.go +++ b/client/allocdir/alloc_dir.go @@ -61,7 +61,7 @@ type AllocDirFS interface { List(path string) ([]*AllocFileInfo, error) Stat(path string) (*AllocFileInfo, error) ReadAt(path string, offset int64) (io.ReadCloser, error) - BlockUntilExists(path string, t *tomb.Tomb) error + BlockUntilExists(path string, t *tomb.Tomb) chan error ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) } @@ -343,11 +343,16 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) { // BlockUntilExists blocks until the passed file relative the allocation // directory exists. The block can be cancelled with the passed tomb. -func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) error { +func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) chan error { // Get the path relative to the alloc directory p := filepath.Join(d.AllocDir, path) watcher := getFileWatcher(p) - return watcher.BlockUntilExists(t) + returnCh := make(chan error, 1) + go func() { + returnCh <- watcher.BlockUntilExists(t) + close(returnCh) + }() + return returnCh } // ChangeEvents watches for changes to the passed path relative to the diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 6375f4bd5971..68b08ed457ce 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -4,7 +4,10 @@ import ( "bytes" "fmt" "io" + "math" "net/http" + "os" + "path/filepath" "strconv" "strings" "sync" @@ -21,6 +24,8 @@ import ( var ( allocIDNotPresentErr = fmt.Errorf("must provide a valid alloc id") fileNameNotPresentErr = fmt.Errorf("must provide a file name") + taskNotPresentErr = fmt.Errorf("must provide task name") + logTypeNotPresentErr = fmt.Errorf("must provide log type (stdout/stderr)") clientNotRunning = fmt.Errorf("node is not running a Nomad Client") invalidOrigin = fmt.Errorf("origin must be start or end") ) @@ -58,6 +63,8 @@ func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (int return s.FileCatRequest(resp, req) case strings.HasPrefix(path, "stream/"): return s.Stream(resp, req) + case strings.HasPrefix(path, "logs/"): + return s.Logs(resp, req) default: return nil, CodedError(404, ErrInvalidMethod) } @@ -499,10 +506,18 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.stream(offset, path, fs, output) + // Create the framer + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + return nil, s.stream(offset, path, fs, framer, nil) } -func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, output io.WriteCloser) error { +func (s *HTTPServer) stream(offset int64, path string, + fs allocdir.AllocDirFS, framer *StreamFramer, + eofCancelCh chan error) error { + // Get the reader f, err := fs.ReadAt(path, offset) if err != nil { @@ -517,11 +532,6 @@ func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, o t.Done() }() - // Create the framer - framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) - framer.Run() - defer framer.Destroy() - // Create a variable to allow setting the last event var lastEvent string @@ -595,9 +605,194 @@ OUTER: continue OUTER case <-framer.ExitCh(): return nil + case err := <-eofCancelCh: + return err } } } return nil } + +// Logs streams the content of a log blocking on EOF. The parameters are: +// * task: task name to stream logs for. +// * type: stdout/stderr to stream. +// * offset: The offset to start streaming data at, defaults to zero. +// * origin: Either "start" or "end" and defines from where the offset is +// applied. Defaults to "start". +func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var allocID, task, logType string + var err error + + q := req.URL.Query() + + if allocID = strings.TrimPrefix(req.URL.Path, "/v1/client/fs/logs/"); allocID == "" { + return nil, allocIDNotPresentErr + } + + if task = q.Get("task"); task == "" { + return nil, taskNotPresentErr + } + + logType = q.Get("type") + switch logType { + case "stdout", "stderr": + default: + return nil, logTypeNotPresentErr + } + + var offset int64 + offsetString := q.Get("offset") + if offsetString != "" { + var err error + if offset, err = strconv.ParseInt(offsetString, 10, 64); err != nil { + return nil, fmt.Errorf("error parsing offset: %v", err) + } + } + + origin := q.Get("origin") + switch origin { + case "start", "end": + case "": + origin = "start" + default: + return nil, invalidOrigin + } + + fs, err := s.agent.client.GetAllocFS(allocID) + if err != nil { + return nil, err + } + + // Create an output that gets flushed on every write + output := ioutils.NewWriteFlusher(resp) + + return nil, s.logs(offset, origin, task, logType, fs, output) +} + +func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { + // Create the framer + framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + // Path to the logs + logPath := filepath.Join(allocdir.SharedAllocName, allocdir.LogDirName) + + // nextIdx is the next index to read logs from + var nextIdx int64 + switch origin { + case "start": + nextIdx = 0 + case "end": + nextIdx = math.MaxInt64 + offset *= -1 + default: + return invalidOrigin + } + + // Create a tomb to cancel watch events + t := tomb.Tomb{} + defer func() { + t.Kill(nil) + t.Done() + }() + + for { + // Logic for picking next file is: + // 1) List log files + // 2) Pick log file closest to desired index + // 3) Open log file at correct offset + // 3a) No error, read contents + // 3b) If file doesn't exist, goto 1 as it may have been rotated out + entries, err := fs.List(logPath) + if err != nil { + return fmt.Errorf("failed to list entries: %v", err) + } + + logEntry, idx, err := findClosest(entries, nextIdx, task, logType) + if err != nil { + return err + } + + // Apply the offset we should open at. Handling the negative case is + // only for the first time. + openOffset := offset + if openOffset < 0 { + openOffset = logEntry.Size + openOffset + if openOffset < 0 { + openOffset = 0 + } + } + + p := filepath.Join(logPath, logEntry.Name) + nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) + nextExists := fs.BlockUntilExists(nextPath, &t) + err = s.stream(openOffset, p, fs, framer, nextExists) + + // Check if there was an error where the file does not exist. That means + // it got rotated out from under us. + if err != nil { + if os.IsNotExist(err) { + continue + } + return err + } + + //Since we successfully streamed, update the overall offset/idx. + offset = int64(0) + idx++ + } + + return nil +} + +func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, + task, logType string) (*allocdir.AllocFileInfo, int64, error) { + + if len(entries) == 0 { + return nil, 0, fmt.Errorf("no file entries found") + } + + prefix := fmt.Sprintf("%s.%s.", task, logType) + + var closest *allocdir.AllocFileInfo + var closestIdx int64 + closestDist := int64(math.MaxInt64) + for _, entry := range entries { + if entry.IsDir { + continue + } + + idxStr := strings.TrimPrefix(entry.Name, prefix) + + // If nothing was trimmed, then it is not a match + if idxStr == entry.Name { + continue + } + + // Convert to an int + idx, err := strconv.Atoi(idxStr) + if err != nil { + return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) + } + + // Determine distance to desired + d := desiredIdx - int64(idx) + if d < 0 { + d *= -1 + } + + if d < closestDist { + closestDist = d + closest = entry + closestIdx = int64(idx) + } + } + + if closest == nil { + return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) + } + + return closest, closestIdx, nil +} From 2530f7bd403215b7e91d15e803d0b618dbf8dc40 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 18 Jul 2016 11:39:38 -0700 Subject: [PATCH 02/16] initial api and command --- api/fs.go | 81 ++++++++++++ command/agent/fs_endpoint.go | 2 +- command/fs.go | 10 +- command/logs.go | 237 +++++++++++++++++++++++++++++++++++ commands.go | 5 + 5 files changed, 325 insertions(+), 10 deletions(-) create mode 100644 command/logs.go diff --git a/api/fs.go b/api/fs.go index 36458c694d7c..4528b36b065b 100644 --- a/api/fs.go +++ b/api/fs.go @@ -204,6 +204,7 @@ func (a *AllocFS) getErrorMsg(resp *http.Response) error { // * path: path to file to stream. // * offset: The offset to start streaming data at. // * origin: Either "start" or "end" and defines from where the offset is applied. +// * cancel: A channel that when closed, streaming will end. // // The return value is a channel that will emit StreamFrames as they are read. func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, @@ -275,6 +276,86 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, return frames, nil, nil } +// Logs streams the content of a tasks logs blocking on EOF. +// The parameters are: +// * allocation: the allocation to stream from. +// * task: the tasks name to stream logs for. +// * logType: Either "stdout" or "stderr" +// * offset: The offset to start streaming data at. +// * origin: Either "start" or "end" and defines from where the offset is applied. +// * cancel: A channel that when closed, streaming will end. +// +// The return value is a channel that will emit StreamFrames as they are read. +func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset int64, + cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) { + + node, _, err := a.client.Nodes().Info(alloc.NodeID, q) + if err != nil { + return nil, nil, err + } + + if node.HTTPAddr == "" { + return nil, nil, fmt.Errorf("http addr of the node where alloc %q is running is not advertised", alloc.ID) + } + u := &url.URL{ + Scheme: "http", + Host: node.HTTPAddr, + Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), + } + v := url.Values{} + v.Set("task", task) + v.Set("type", logType) + v.Set("origin", origin) + v.Set("offset", strconv.FormatInt(offset, 10)) + u.RawQuery = v.Encode() + req := &http.Request{ + Method: "GET", + URL: u, + Cancel: cancel, + } + c := http.Client{} + resp, err := c.Do(req) + if err != nil { + return nil, nil, err + } + + // Create the output channel + frames := make(chan *StreamFrame, 10) + + go func() { + // Close the body + defer resp.Body.Close() + + // Create a decoder + dec := json.NewDecoder(resp.Body) + + for { + // Check if we have been cancelled + select { + case <-cancel: + return + default: + } + + // Decode the next frame + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + close(frames) + return + } + + // Discard heartbeat frames + if frame.IsHeartbeat() { + continue + } + + frames <- &frame + } + }() + + return frames, nil, nil +} + // FrameReader is used to convert a stream of frames into a read closer. type FrameReader struct { frames <-chan *StreamFrame diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 68b08ed457ce..35f15be168d6 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -741,7 +741,7 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi //Since we successfully streamed, update the overall offset/idx. offset = int64(0) - idx++ + nextIdx++ } return nil diff --git a/command/fs.go b/command/fs.go index e2f99bc0e46d..2cf1b45c9e99 100644 --- a/command/fs.go +++ b/command/fs.go @@ -81,7 +81,7 @@ func (f *FSCommand) Run(args []string) int { var verbose, machine, job, stat, tail, follow bool var numLines, numBytes int64 - flags := f.Meta.FlagSet("fs-list", FlagSetClient) + flags := f.Meta.FlagSet("fs", FlagSetClient) flags.Usage = func() { f.Ui.Output(f.Help()) } flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&machine, "H", false, "") @@ -177,14 +177,6 @@ func (f *FSCommand) Run(args []string) int { return 1 } - if alloc.DesiredStatus == "failed" { - allocID := limit(alloc.ID, length) - msg := fmt.Sprintf(`The allocation %q failed to be placed. To see the cause, run: -nomad alloc-status %s`, allocID, allocID) - f.Ui.Error(msg) - return 0 - } - // Get file stat info file, _, err := client.AllocFS().Stat(alloc, path, nil) if err != nil { diff --git a/command/logs.go b/command/logs.go new file mode 100644 index 000000000000..25edc65db0ed --- /dev/null +++ b/command/logs.go @@ -0,0 +1,237 @@ +package command + +import ( + "fmt" + "io" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/hashicorp/nomad/api" +) + +type LogsCommand struct { + Meta +} + +func (l *LogsCommand) Help() string { + helpText := ` +Usage: nomad logs [options] + +TODO + +General Options: + + ` + generalOptionsUsage() + ` + +Logs Specific Options: + + -verbose + Show full information. + + -job + Use a random allocation from a specified job-id. + + -tail + Show the files contents with offsets relative to the end of the file. If no + offset is given, -n is defaulted to 10. + + -n + Sets the tail location in best-efforted number of lines relative to the end + of the file. + + -c + Sets the tail location in number of bytes relative to the end of the file. +` + return strings.TrimSpace(helpText) +} + +func (l *LogsCommand) Synopsis() string { + return "Inspect the contents of an allocation directory" +} + +func (l *LogsCommand) Run(args []string) int { + var verbose, job, tail, stderr bool + var numLines, numBytes int64 + + flags := l.Meta.FlagSet("logs-list", FlagSetClient) + flags.Usage = func() { l.Ui.Output(l.Help()) } + flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&job, "job", false, "") + flags.BoolVar(&tail, "tail", false, "") + flags.BoolVar(&stderr, "stderr", false, "") + flags.Int64Var(&numLines, "n", -1, "") + flags.Int64Var(&numBytes, "c", -1, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + args = flags.Args() + + if len(args) < 2 { + if job { + l.Ui.Error("job ID and task name required") + } else { + l.Ui.Error("allocation ID and task name required") + } + + return 1 + } + + task := args[1] + if task == "" { + l.Ui.Error("task name required") + return 1 + } + + client, err := l.Meta.Client() + if err != nil { + l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) + return 1 + } + + // If -job is specified, use random allocation, otherwise use provided allocation + allocID := args[0] + if job { + allocID, err = getRandomJobAlloc(client, args[0]) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error fetching allocations: %v", err)) + return 1 + } + } + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + // Query the allocation info + if len(allocID) == 1 { + l.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + if len(allocID)%2 == 1 { + // Identifiers must be of even length, so we strip off the last byte + // to provide a consistent user experience. + allocID = allocID[:len(allocID)-1] + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + if len(allocs) == 0 { + l.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + if len(allocs) > 1 { + // Format the allocs + out := make([]string, len(allocs)+1) + out[0] = "ID|Eval ID|Job ID|Task Group|Desired Status|Client Status" + for i, alloc := range allocs { + out[i+1] = fmt.Sprintf("%s|%s|%s|%s|%s|%s", + limit(alloc.ID, length), + limit(alloc.EvalID, length), + alloc.JobID, + alloc.TaskGroup, + alloc.DesiredStatus, + alloc.ClientStatus, + ) + } + l.Ui.Output(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", formatList(out))) + return 0 + } + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + l.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + logType := "stdout" + if stderr { + logType = "stderr" + } + + // We have a file, output it. + var r io.ReadCloser + var readErr error + if !tail { + r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0, -1) + if readErr != nil { + readErr = fmt.Errorf("Error reading file: %v", readErr) + } + } else { + // Parse the offset + var offset int64 = defaultTailLines * bytesToLines + + if nLines, nBytes := numLines != -1, numBytes != -1; nLines && nBytes { + l.Ui.Error("Both -n and -c set") + return 1 + } else if nLines { + offset = numLines * bytesToLines + } else if nBytes { + offset = numBytes + } else { + numLines = defaultTailLines + } + + r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset, numLines) + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + } + + if readErr != nil { + readErr = fmt.Errorf("Error tailing file: %v", readErr) + } + } + + defer r.Close() + if readErr != nil { + l.Ui.Error(readErr.Error()) + return 1 + } + + io.Copy(os.Stdout, r) + return 0 +} + +// followFile outputs the contents of the file to stdout relative to the end of +// the file. If numLines does not equal -1, then tail -n behavior is used. +func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, + task, logType, origin string, offset, numLines int64) (io.ReadCloser, error) { + + cancel := make(chan struct{}) + frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil) + if err != nil { + return nil, err + } + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + // Create a reader + var r io.ReadCloser + frameReader := api.NewFrameReader(frames, cancel) + r = frameReader + + // If numLines is set, wrap the reader + if numLines != -1 { + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + } + + go func() { + <-signalCh + + // End the streaming + r.Close() + + // Output the last offset + l.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) + }() + + return r, nil +} diff --git a/commands.go b/commands.go index 69d96273ffd3..acef7f16c670 100644 --- a/commands.go +++ b/commands.go @@ -79,6 +79,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "logs": func() (cli.Command, error) { + return &command.LogsCommand{ + Meta: meta, + }, nil + }, "node-drain": func() (cli.Command, error) { return &command.NodeDrainCommand{ Meta: meta, From 065cf0915af0692817cdf93acad8f6f638fde309 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 18 Jul 2016 18:41:21 -0700 Subject: [PATCH 03/16] tests --- command/agent/fs_endpoint.go | 12 +-- command/agent/fs_endpoint_test.go | 118 +++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 9 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 35f15be168d6..2a338da48e3e 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -261,7 +261,6 @@ func (s *StreamFramer) Destroy() { s.l.Lock() wasRunning := s.running s.running = false - s.f = nil close(s.shutdownCh) s.heartbeat.Stop() s.flusher.Stop() @@ -271,6 +270,7 @@ func (s *StreamFramer) Destroy() { if wasRunning { <-s.exitCh } + s.out.Close() } // Run starts a long lived goroutine that handles sending data as well as @@ -299,7 +299,6 @@ func (s *StreamFramer) run() { defer func() { s.l.Lock() s.err = err - s.out.Close() close(s.exitCh) close(s.outbound) s.l.Unlock() @@ -322,8 +321,11 @@ func (s *StreamFramer) run() { // Read the data for the frame, and send it s.f.Data = s.readData() - s.outbound <- s.f - s.f = nil + select { + case s.outbound <- s.f: + s.f = nil + default: + } s.l.Unlock() case <-s.heartbeat.C: @@ -339,7 +341,7 @@ OUTER: case <-s.shutdownCh: break OUTER case o := <-s.outbound: - // Send the frame and then clear the current working frame + // Send the frame if err = s.enc.Encode(o); err != nil { return } diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index aaf1f8b4dfb6..a6e839829f78 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -1,6 +1,7 @@ package agent import ( + "bytes" "io" "io/ioutil" "net/http" @@ -8,6 +9,7 @@ import ( "os" "path/filepath" "reflect" + "strconv" "testing" "time" @@ -302,6 +304,98 @@ func TestStreamFramer_Heartbeat(t *testing.T) { } } +// This test checks that frames are received in order +func TestStreamFramer_Order(t *testing.T) { + // Create the stream framer + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + // Ensure the batch window doesn't get hit + hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + sf.Run() + + // Create a decoder + dec := codec.NewDecoder(r, jsonHandle) + + //files := []string{"1", "2", "3", "4", "5"} + files := []string{"1"} + input := bytes.NewBuffer(make([]byte, 100000)) + for i := 0; i <= 2000; i++ { + str := strconv.Itoa(i) + "\n" + input.WriteString(str) + } + + expected := bytes.NewBuffer(make([]byte, 100000)) + for _, _ = range files { + expected.Write(input.Bytes()) + } + receivedBuf := bytes.NewBuffer(make([]byte, 100000)) + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + t.Fatalf("failed to decode") + } + + if frame.IsHeartbeat() { + continue + } + + receivedBuf.Write(frame.Data) + + if reflect.DeepEqual(expected, receivedBuf) { + resultCh <- struct{}{} + return + } + } + }() + + // Send the data + b := input.Bytes() + shards := 10 + each := len(b) / shards + for _, f := range files { + for i := 0; i < shards; i++ { + l, r := each*i, each*(i+1) + if i == shards-1 { + r = len(b) + } + + if err := sf.Send(f, "", b[l:r], 0); err != nil { + t.Fatalf("Send() failed %v", err) + } + } + } + + // Ensure we get data + select { + case <-resultCh: + case <-time.After(4 * bWindow): + got := receivedBuf.String() + want := expected.String() + t.Fatalf("Did not receive data in sorted order\nGot:%v\nWant:%v\n", got, want) + } + + // Close the reader and wait. This should cause the runner to exit + if err := r.Close(); err != nil { + t.Fatalf("failed to close reader") + } + + select { + case <-sf.ExitCh(): + case <-time.After(2 * hRate): + t.Fatalf("exit channel should close") + } + + sf.Destroy() + if !wrappedW.Closed { + t.Fatalf("writer not closed") + } +} + func TestHTTP_Stream_MissingParams(t *testing.T) { httpTest(t, nil, func(s *TestServer) { req, err := http.NewRequest("GET", "/v1/client/fs/stream/", nil) @@ -364,7 +458,11 @@ func TestHTTP_Stream_NoFile(t *testing.T) { ad := tempAllocDir(t) defer os.RemoveAll(ad.AllocDir) - if err := s.Server.stream(0, "foo", ad, nopWriteCloser{ioutil.Discard}); err == nil { + framer := NewStreamFramer(nopWriteCloser{ioutil.Discard}, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + + if err := s.Server.stream(0, "foo", ad, framer, nil); err == nil { t.Fatalf("expected an error when streaming unknown file") } }) @@ -419,9 +517,13 @@ func TestHTTP_Stream_Modify(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, w); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -496,9 +598,13 @@ func TestHTTP_Stream_Truncate(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(w, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + defer framer.Destroy() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, w); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -595,9 +701,12 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("write failed: %v", err) } + framer := NewStreamFramer(wrappedW, streamHeartbeatRate, streamBatchWindow, streamFrameSize) + framer.Run() + // Start streaming go func() { - if err := s.Server.stream(0, streamFile, ad, wrappedW); err != nil { + if err := s.Server.stream(0, streamFile, ad, framer, nil); err != nil { t.Fatalf("stream() failed: %v", err) } }() @@ -615,6 +724,7 @@ func TestHTTP_Stream_Delete(t *testing.T) { t.Fatalf("did not receive delete") } + framer.Destroy() testutil.WaitForResult(func() (bool, error) { return wrappedW.Closed, nil }, func(err error) { From 53f198932f11da0088b34f37cf10337b2c17d8b5 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jul 2016 10:04:57 -0700 Subject: [PATCH 04/16] Fix buffer reuse --- command/agent/fs_endpoint.go | 72 ++++++++++++++++++------------- command/agent/fs_endpoint_test.go | 32 +++++++++----- command/agent/http.go | 2 +- 3 files changed, 65 insertions(+), 41 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 2a338da48e3e..37a4c1df4049 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -36,7 +36,7 @@ const ( // streamHeartbeatRate is the rate at which a heartbeat will occur to detect // a closed connection without sending any additional data - streamHeartbeatRate = 10 * time.Second + streamHeartbeatRate = 1 * time.Second // streamBatchWindow is the window in which file content is batched before // being flushed if the frame size has not been hit. @@ -230,7 +230,7 @@ type StreamFramer struct { // Captures whether the framer is running and any error that occurred to // cause it to stop. running bool - err error + Err error } // NewStreamFramer creates a new stream framer that will output StreamFrames to @@ -259,15 +259,13 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio // Destroy is used to cleanup the StreamFramer and flush any pending frames func (s *StreamFramer) Destroy() { s.l.Lock() - wasRunning := s.running - s.running = false close(s.shutdownCh) s.heartbeat.Stop() s.flusher.Stop() s.l.Unlock() // Ensure things were flushed - if wasRunning { + if s.running { <-s.exitCh } s.out.Close() @@ -298,9 +296,10 @@ func (s *StreamFramer) run() { var err error defer func() { s.l.Lock() - s.err = err + s.Err = err close(s.exitCh) close(s.outbound) + s.running = false s.l.Unlock() }() @@ -309,6 +308,8 @@ func (s *StreamFramer) run() { go func() { for { select { + case <-s.exitCh: + return case <-s.shutdownCh: return case <-s.flusher.C: @@ -321,16 +322,15 @@ func (s *StreamFramer) run() { // Read the data for the frame, and send it s.f.Data = s.readData() - select { - case s.outbound <- s.f: - s.f = nil - default: - } - + s.outbound <- s.f + s.f = nil s.l.Unlock() case <-s.heartbeat.C: // Send a heartbeat frame - s.outbound <- &StreamFrame{} + select { + case s.outbound <- &StreamFrame{}: + default: + } } } }() @@ -350,11 +350,11 @@ OUTER: // Flush any existing frames s.l.Lock() - defer s.l.Unlock() select { case o := <-s.outbound: // Send the frame and then clear the current working frame if err = s.enc.Encode(o); err != nil { + s.l.Unlock() return } default: @@ -364,6 +364,7 @@ OUTER: s.f.Data = s.readData() s.enc.Encode(s.f) } + s.l.Unlock() } // readData is a helper which reads the buffered data returning up to the frame @@ -378,7 +379,10 @@ func (s *StreamFramer) readData() []byte { if size == 0 { return nil } - return s.data.Next(size) + d := s.data.Next(size) + b := make([]byte, size) + copy(b, d) + return b } // Send creates and sends a StreamFrame based on the passed parameters. An error @@ -391,8 +395,8 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // If we are not running, return the error that caused us to not run or // indicated that it was never started. if !s.running { - if s.err != nil { - return s.err + if s.Err != nil { + return s.Err } return fmt.Errorf("StreamFramer not running") } @@ -400,13 +404,14 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // Check if not mergeable if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame - s.outbound <- &StreamFrame{ - Offset: s.f.Offset, - File: s.f.File, - FileEvent: s.f.FileEvent, - Data: s.readData(), + f := *s.f + f.Data = s.readData() + select { + case <-s.exitCh: + return nil + case s.outbound <- &f: + s.f = nil } - s.f = nil } // Store the new data as the current frame. @@ -423,21 +428,30 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e // Handle the delete case in which there is no data if s.data.Len() == 0 && s.f.FileEvent != "" { - s.outbound <- &StreamFrame{ + select { + case <-s.exitCh: + return nil + case s.outbound <- &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, + }: } } // Flush till we are under the max frame size for s.data.Len() >= s.frameSize { // Create a new frame to send it - s.outbound <- &StreamFrame{ + d := s.readData() + select { + case <-s.exitCh: + return nil + case s.outbound <- &StreamFrame{ Offset: s.f.Offset, File: s.f.File, FileEvent: s.f.FileEvent, - Data: s.readData(), + Data: d, + }: } } @@ -743,7 +757,7 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi //Since we successfully streamed, update the overall offset/idx. offset = int64(0) - nextIdx++ + nextIdx = idx + 1 } return nil @@ -759,7 +773,7 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, prefix := fmt.Sprintf("%s.%s.", task, logType) var closest *allocdir.AllocFileInfo - var closestIdx int64 + closestIdx := int64(math.MaxInt64) closestDist := int64(math.MaxInt64) for _, entry := range entries { if entry.IsDir { @@ -785,7 +799,7 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, d *= -1 } - if d < closestDist { + if d <= closestDist && (int64(idx) < closestIdx || int64(idx) == desiredIdx) { closestDist = d closest = entry closestIdx = int64(idx) diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index a6e839829f78..c8c6d827fd45 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -10,11 +10,13 @@ import ( "path/filepath" "reflect" "strconv" + "strings" "testing" "time" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/testutil" + "github.com/pmezard/go-difflib/difflib" "github.com/ugorji/go/codec" ) @@ -310,26 +312,25 @@ func TestStreamFramer_Order(t *testing.T) { r, w := io.Pipe() wrappedW := &WriteCloseChecker{WriteCloser: w} // Ensure the batch window doesn't get hit - hRate, bWindow := 100*time.Millisecond, 100*time.Millisecond - sf := NewStreamFramer(wrappedW, hRate, bWindow, 100) + hRate, bWindow := 100*time.Millisecond, 10*time.Millisecond + sf := NewStreamFramer(wrappedW, hRate, bWindow, 10) sf.Run() // Create a decoder dec := codec.NewDecoder(r, jsonHandle) - //files := []string{"1", "2", "3", "4", "5"} - files := []string{"1"} - input := bytes.NewBuffer(make([]byte, 100000)) - for i := 0; i <= 2000; i++ { - str := strconv.Itoa(i) + "\n" + files := []string{"1", "2", "3", "4", "5"} + input := bytes.NewBuffer(make([]byte, 0, 100000)) + for i := 0; i <= 1000; i++ { + str := strconv.Itoa(i) + "," input.WriteString(str) } - expected := bytes.NewBuffer(make([]byte, 100000)) + expected := bytes.NewBuffer(make([]byte, 0, 100000)) for _, _ = range files { expected.Write(input.Bytes()) } - receivedBuf := bytes.NewBuffer(make([]byte, 100000)) + receivedBuf := bytes.NewBuffer(make([]byte, 0, 100000)) // Start the reader resultCh := make(chan struct{}) @@ -373,10 +374,19 @@ func TestStreamFramer_Order(t *testing.T) { // Ensure we get data select { case <-resultCh: - case <-time.After(4 * bWindow): + case <-time.After(10 * bWindow): got := receivedBuf.String() want := expected.String() - t.Fatalf("Did not receive data in sorted order\nGot:%v\nWant:%v\n", got, want) + diff := difflib.ContextDiff{ + A: difflib.SplitLines(strings.Replace(got, ",", "\n", -1)), + B: difflib.SplitLines(strings.Replace(want, ",", "\n", -1)), + FromFile: "Got", + ToFile: "Want", + Context: 3, + Eol: "\n", + } + result, _ := difflib.GetContextDiffString(diff) + t.Fatalf(strings.Replace(result, "\t", " ", -1)) } // Close the reader and wait. This should cause the runner to exit diff --git a/command/agent/http.go b/command/agent/http.go index 5d6c42aa00e8..197effe481ca 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -60,7 +60,7 @@ func NewHTTPServer(agent *Agent, config *Config, logOutput io.Writer) (*HTTPServ agent: agent, mux: mux, listener: ln, - logger: log.New(logOutput, "", log.LstdFlags), + logger: agent.logger, addr: ln.Addr().String(), } srv.registerHandlers(config.EnableDebug) From 4cf65d7944f0ba7e48ea8eb293ebd1f19540d47a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jul 2016 15:58:02 -0700 Subject: [PATCH 05/16] Read from correct offset --- command/agent/fs_endpoint.go | 116 +++++++++++------ command/agent/fs_endpoint_test.go | 206 ++++++++++++++++++++++++++++++ command/logs.go | 13 +- 3 files changed, 290 insertions(+), 45 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 37a4c1df4049..225530561ccf 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -726,21 +727,11 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return fmt.Errorf("failed to list entries: %v", err) } - logEntry, idx, err := findClosest(entries, nextIdx, task, logType) + logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType) if err != nil { return err } - // Apply the offset we should open at. Handling the negative case is - // only for the first time. - openOffset := offset - if openOffset < 0 { - openOffset = logEntry.Size + openOffset - if openOffset < 0 { - openOffset = 0 - } - } - p := filepath.Join(logPath, logEntry.Name) nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) nextExists := fs.BlockUntilExists(nextPath, &t) @@ -763,26 +754,36 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return nil } -func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, - task, logType string) (*allocdir.AllocFileInfo, int64, error) { +// indexTuple and indexTupleArray are used to find the correct log entry to +// start streaming logs from +type indexTuple struct { + idx int64 + entry *allocdir.AllocFileInfo +} - if len(entries) == 0 { - return nil, 0, fmt.Errorf("no file entries found") - } +type indexTupleArray []indexTuple - prefix := fmt.Sprintf("%s.%s.", task, logType) +func (a indexTupleArray) Len() int { return len(a) } +func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx } +func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - var closest *allocdir.AllocFileInfo - closestIdx := int64(math.MaxInt64) - closestDist := int64(math.MaxInt64) +// findClosest takes a list of entries, the desired log index and desired log +// offset (which can be negative, treated as offset from end), task name and log +// type and returns the log entry, the log index, the offset to read from and a +// potential error. +func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64, + task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) { + + // Build the matching indexes + var indexes []indexTuple + prefix := fmt.Sprintf("%s.%s.", task, logType) for _, entry := range entries { if entry.IsDir { continue } - idxStr := strings.TrimPrefix(entry.Name, prefix) - // If nothing was trimmed, then it is not a match + idxStr := strings.TrimPrefix(entry.Name, prefix) if idxStr == entry.Name { continue } @@ -790,25 +791,68 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx int64, // Convert to an int idx, err := strconv.Atoi(idxStr) if err != nil { - return nil, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) + return nil, 0, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) } - // Determine distance to desired - d := desiredIdx - int64(idx) - if d < 0 { - d *= -1 - } + indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry}) + } - if d <= closestDist && (int64(idx) < closestIdx || int64(idx) == desiredIdx) { - closestDist = d - closest = entry - closestIdx = int64(idx) - } + if len(indexes) == 0 { + return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) + } + + // Binary search the indexes to get the desiredIdx + sort.Sort(indexTupleArray(indexes)) + i := sort.Search(len(indexes), func(i int) bool { return indexes[i].idx >= desiredIdx }) + l := len(indexes) + if i == l { + // Use the last index if the number is bigger than all of them. + i = l - 1 } - if closest == nil { - return nil, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) + // Get to the correct offset + offset := desiredOffset + idx := int64(i) + for { + s := indexes[idx].entry.Size + + // Base case + if offset == 0 { + break + } else if offset < 0 { + // Going backwards + if newOffset := s + offset; newOffset >= 0 { + // Current file works + offset = newOffset + break + } else if idx == 0 { + // Already at the end + offset = 0 + break + } else { + // Try the file before + offset = newOffset + idx -= 1 + continue + } + } else { + // Going forward + if offset <= s { + // Current file works + break + } else if idx == int64(l-1) { + // Already at the end + offset = s + break + } else { + // Try the next file + offset = offset - s + idx += 1 + continue + } + + } } - return closest, closestIdx, nil + return indexes[idx].entry, indexes[idx].idx, offset, nil } diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index c8c6d827fd45..ced3708af87d 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -4,6 +4,7 @@ import ( "bytes" "io" "io/ioutil" + "math" "net/http" "net/http/httptest" "os" @@ -743,3 +744,208 @@ func TestHTTP_Stream_Delete(t *testing.T) { }) } + +func TestLogs_findClosest(t *testing.T) { + task := "foo" + entries := []*allocdir.AllocFileInfo{ + { + Name: "foo.stdout.0", + Size: 100, + }, + { + Name: "foo.stdout.1", + Size: 100, + }, + { + Name: "foo.stdout.2", + Size: 100, + }, + { + Name: "foo.stdout.3", + Size: 100, + }, + { + Name: "foo.stderr.0", + Size: 100, + }, + { + Name: "foo.stderr.1", + Size: 100, + }, + { + Name: "foo.stderr.2", + Size: 100, + }, + } + + cases := []struct { + Entries []*allocdir.AllocFileInfo + DesiredIdx int64 + DesiredOffset int64 + Task string + LogType string + ExpectedFile string + ExpectedIdx int64 + ExpectedOffset int64 + Error bool + }{ + // Test error cases + { + Entries: nil, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + Error: true, + }, + { + Entries: entries[0:3], + DesiredIdx: 0, + Task: task, + LogType: "stderr", + Error: true, + }, + + // Test begining cases + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 0, + DesiredOffset: -100, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + // Desired offset should be ignored at edges + Entries: entries, + DesiredIdx: 1, + DesiredOffset: -1000, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + ExpectedOffset: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stderr", + ExpectedFile: entries[4].Name, + ExpectedIdx: 0, + }, + { + Entries: entries, + DesiredIdx: 0, + Task: task, + LogType: "stdout", + ExpectedFile: entries[0].Name, + ExpectedIdx: 0, + }, + + // Test middle cases + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[1].Name, + ExpectedIdx: 1, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + DesiredOffset: 110, + Task: task, + LogType: "stdout", + ExpectedFile: entries[2].Name, + ExpectedIdx: 2, + ExpectedOffset: 10, + }, + { + Entries: entries, + DesiredIdx: 1, + Task: task, + LogType: "stderr", + ExpectedFile: entries[5].Name, + ExpectedIdx: 1, + }, + // Test end cases + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: math.MaxInt64, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 100, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + DesiredOffset: -10, + Task: task, + LogType: "stdout", + ExpectedFile: entries[3].Name, + ExpectedIdx: 3, + ExpectedOffset: 90, + }, + { + Entries: entries, + DesiredIdx: math.MaxInt64, + Task: task, + LogType: "stderr", + ExpectedFile: entries[6].Name, + ExpectedIdx: 2, + }, + } + + for i, c := range cases { + entry, idx, offset, err := findClosest(c.Entries, c.DesiredIdx, c.DesiredOffset, c.Task, c.LogType) + if err != nil { + if !c.Error { + t.Fatalf("case %d: Unexpected error: %v", i, err) + } + continue + } + + if entry.Name != c.ExpectedFile { + t.Fatalf("case %d: Got file %q; want %q", i, entry.Name, c.ExpectedFile) + } + if idx != c.ExpectedIdx { + t.Fatalf("case %d: Got index %d; want %d", i, idx, c.ExpectedIdx) + } + if offset != c.ExpectedOffset { + t.Fatalf("case %d: Got offset %d; want %d", i, offset, c.ExpectedOffset) + } + } +} diff --git a/command/logs.go b/command/logs.go index 25edc65db0ed..282783d8db09 100644 --- a/command/logs.go +++ b/command/logs.go @@ -159,7 +159,7 @@ func (l *LogsCommand) Run(args []string) int { var r io.ReadCloser var readErr error if !tail { - r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0, -1) + r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0) if readErr != nil { readErr = fmt.Errorf("Error reading file: %v", readErr) } @@ -178,7 +178,7 @@ func (l *LogsCommand) Run(args []string) int { numLines = defaultTailLines } - r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset, numLines) + r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset) // If numLines is set, wrap the reader if numLines != -1 { @@ -201,9 +201,9 @@ func (l *LogsCommand) Run(args []string) int { } // followFile outputs the contents of the file to stdout relative to the end of -// the file. If numLines does not equal -1, then tail -n behavior is used. +// the file. func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, - task, logType, origin string, offset, numLines int64) (io.ReadCloser, error) { + task, logType, origin string, offset int64) (io.ReadCloser, error) { cancel := make(chan struct{}) frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil) @@ -218,11 +218,6 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, frameReader := api.NewFrameReader(frames, cancel) r = frameReader - // If numLines is set, wrap the reader - if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) - } - go func() { <-signalCh From ac1cfd182153296c75df1e1bcc52d2802d33d402 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jul 2016 19:48:16 -0700 Subject: [PATCH 06/16] unblock the readers to add liveness when using -n --- api/fs.go | 29 +++++++++++++++---- api/fs_test.go | 37 ++++++++++++++++++++++++ command/fs.go | 4 +-- command/helpers.go | 57 +++++++++++++++++++++++++++++++++---- command/helpers_test.go | 63 +++++++++++++++++++++++++++++++++++++++-- command/logs.go | 6 ++-- 6 files changed, 179 insertions(+), 17 deletions(-) diff --git a/api/fs.go b/api/fs.go index 4528b36b065b..5be452c2bfdc 100644 --- a/api/fs.go +++ b/api/fs.go @@ -362,6 +362,8 @@ type FrameReader struct { cancelCh chan struct{} closed bool + unblockTime time.Duration + frame *StreamFrame frameOffset int @@ -381,6 +383,12 @@ func NewFrameReader(frames <-chan *StreamFrame, cancelCh chan struct{}) *FrameRe } } +// SetUnblockTime sets the time to unblock and return zero bytes read. If the +// duration is unset or is zero or less, the read will block til data is read. +func (f *FrameReader) SetUnblockTime(d time.Duration) { + f.unblockTime = d +} + // Offset returns the offset into the stream. func (f *FrameReader) Offset() int { return f.byteOffset @@ -390,14 +398,23 @@ func (f *FrameReader) Offset() int { // when there are no more frames. func (f *FrameReader) Read(p []byte) (n int, err error) { if f.frame == nil { - frame, ok := <-f.frames - if !ok { - return 0, io.EOF + var unblock <-chan time.Time + if f.unblockTime.Nanoseconds() > 0 { + unblock = time.After(f.unblockTime) } - f.frame = frame - // Store the total offset into the file - f.byteOffset = int(f.frame.Offset) + select { + case frame, ok := <-f.frames: + if !ok { + return 0, io.EOF + } + f.frame = frame + + // Store the total offset into the file + f.byteOffset = int(f.frame.Offset) + case <-unblock: + return 0, nil + } } if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { diff --git a/api/fs_test.go b/api/fs_test.go index 25107d9c88ed..bea8d5984420 100644 --- a/api/fs_test.go +++ b/api/fs_test.go @@ -4,6 +4,7 @@ import ( "io" "reflect" "testing" + "time" ) func TestFS_FrameReader(t *testing.T) { @@ -73,3 +74,39 @@ func TestFS_FrameReader(t *testing.T) { t.Fatalf("offset %d, wanted %d", r.Offset(), len(expected)) } } + +func TestFS_FrameReader_Unblock(t *testing.T) { + // Create a channel of the frames and a cancel channel + framesCh := make(chan *StreamFrame, 3) + cancelCh := make(chan struct{}) + + r := NewFrameReader(framesCh, cancelCh) + r.SetUnblockTime(10 * time.Millisecond) + + // Read a little + p := make([]byte, 12) + + n, err := r.Read(p) + if err != nil { + t.Fatalf("Read failed: %v", err) + } + + if n != 0 { + t.Fatalf("should have unblocked") + } + + // Unset the unblock + r.SetUnblockTime(0) + + resultCh := make(chan struct{}) + go func() { + r.Read(p) + close(resultCh) + }() + + select { + case <-resultCh: + t.Fatalf("shouldn't have unblocked") + case <-time.After(300 * time.Millisecond): + } +} diff --git a/command/fs.go b/command/fs.go index 2cf1b45c9e99..a9d67492a8d9 100644 --- a/command/fs.go +++ b/command/fs.go @@ -282,7 +282,7 @@ func (f *FSCommand) Run(args []string) int { // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) } } @@ -321,7 +321,7 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 0) } go func() { diff --git a/command/helpers.go b/command/helpers.go index 36f3455cc453..b9c4a6be7af2 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -105,17 +105,26 @@ type LineLimitReader struct { lines int searchLimit int + timeLimit time.Duration + lastRead time.Time + buffer *bytes.Buffer bufFiled bool foundLines bool } // NewLineLimitReader takes the ReadCloser to wrap, the number of lines to find -// searching backwards in the first searchLimit bytes. -func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReader { +// searching backwards in the first searchLimit bytes. timeLimit can optionally +// be specified by passing a non-zero duration. When set, the search for the +// last n lines is aborted if no data has been read in the duration. This +// can be used to flush what is had if no extra data is being received. When +// used, the underlying reader must not block forever and must periodically +// unblock even when no data has been read. +func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int, timeLimit time.Duration) *LineLimitReader { return &LineLimitReader{ ReadCloser: r, searchLimit: searchLimit, + timeLimit: timeLimit, lines: lines, buffer: bytes.NewBuffer(make([]byte, 0, searchLimit)), } @@ -124,14 +133,52 @@ func NewLineLimitReader(r io.ReadCloser, lines, searchLimit int) *LineLimitReade func (l *LineLimitReader) Read(p []byte) (n int, err error) { // Fill up the buffer so we can find the correct number of lines. if !l.bufFiled { - _, err := l.buffer.ReadFrom(io.LimitReader(l.ReadCloser, int64(l.searchLimit))) + b := make([]byte, len(p)) + n, err := l.ReadCloser.Read(b) + if n > 0 { + if _, err := l.buffer.Write(b[:n]); err != nil { + return 0, err + } + } + if err != nil { - return 0, err + if err != io.EOF { + return 0, err + } + + l.bufFiled = true + goto READ + } + + if l.buffer.Len() >= l.searchLimit { + l.bufFiled = true + goto READ + } + + if l.timeLimit.Nanoseconds() > 0 { + if l.lastRead.IsZero() { + l.lastRead = time.Now() + return 0, nil + } + + now := time.Now() + if n == 0 { + // We hit the limit + if l.lastRead.Add(l.timeLimit).Before(now) { + l.bufFiled = true + goto READ + } else { + return 0, nil + } + } else { + l.lastRead = now + } } - l.bufFiled = true + return 0, nil } +READ: if l.bufFiled && l.buffer.Len() != 0 { b := l.buffer.Bytes() diff --git a/command/helpers_test.go b/command/helpers_test.go index c2dd64e821c6..f43e701e7749 100644 --- a/command/helpers_test.go +++ b/command/helpers_test.go @@ -1,9 +1,12 @@ package command import ( + "io" "io/ioutil" + "reflect" "strings" "testing" + "time" "github.com/mitchellh/cli" ) @@ -48,7 +51,7 @@ func TestHelpers_NodeID(t *testing.T) { } } -func TestHelpers_LineLimitReader(t *testing.T) { +func TestHelpers_LineLimitReader_NoTimeLimit(t *testing.T) { helloString := `hello world this @@ -114,7 +117,7 @@ test`, for i, c := range cases { in := ioutil.NopCloser(strings.NewReader(c.Input)) - limit := NewLineLimitReader(in, c.Lines, c.SearchLimit) + limit := NewLineLimitReader(in, c.Lines, c.SearchLimit, 0) outBytes, err := ioutil.ReadAll(limit) if err != nil { t.Fatalf("case %d failed: %v", i, err) @@ -126,3 +129,59 @@ test`, } } } + +type testReadCloser struct { + data chan []byte +} + +func (t *testReadCloser) Read(p []byte) (n int, err error) { + select { + case b, ok := <-t.data: + if !ok { + return 0, io.EOF + } + + return copy(p, b), nil + case <-time.After(10 * time.Millisecond): + return 0, nil + } +} + +func (t *testReadCloser) Close() error { + close(t.data) + return nil +} + +func TestHelpers_LineLimitReader_TimeLimit(t *testing.T) { + // Create the test reader + in := &testReadCloser{data: make(chan []byte)} + + // Set up the reader such that it won't hit the line/buffer limit and could + // only terminate if it hits the time limit + limit := NewLineLimitReader(in, 1000, 1000, 100*time.Millisecond) + + expected := []byte("hello world") + + resultCh := make(chan struct{}) + go func() { + outBytes, err := ioutil.ReadAll(limit) + if err != nil { + t.Fatalf("ReadAll failed: %v", err) + } + + if reflect.DeepEqual(outBytes, expected) { + close(resultCh) + return + } + }() + + // Send the data + in.data <- expected + in.Close() + + select { + case <-resultCh: + case <-time.After(1 * time.Second): + t.Fatalf("did not exit by time limit") + } +} diff --git a/command/logs.go b/command/logs.go index 282783d8db09..d7073f43d338 100644 --- a/command/logs.go +++ b/command/logs.go @@ -7,6 +7,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/hashicorp/nomad/api" ) @@ -33,7 +34,7 @@ Logs Specific Options: -job Use a random allocation from a specified job-id. - -tail + -tail Show the files contents with offsets relative to the end of the file. If no offset is given, -n is defaulted to 10. @@ -182,7 +183,7 @@ func (l *LogsCommand) Run(args []string) int { // If numLines is set, wrap the reader if numLines != -1 { - r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines)) + r = NewLineLimitReader(r, int(numLines), int(numLines*bytesToLines), 1*time.Second) } if readErr != nil { @@ -216,6 +217,7 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, // Create a reader var r io.ReadCloser frameReader := api.NewFrameReader(frames, cancel) + frameReader.SetUnblockTime(500 * time.Millisecond) r = frameReader go func() { From 8cb4f76a65710716948e86cbc43c2e8724621a93 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 19 Jul 2016 19:51:49 -0700 Subject: [PATCH 07/16] remove file events and last offset --- api/fs.go | 21 --------------------- command/fs.go | 3 --- command/logs.go | 3 --- 3 files changed, 27 deletions(-) diff --git a/api/fs.go b/api/fs.go index 5be452c2bfdc..42ab3e579620 100644 --- a/api/fs.go +++ b/api/fs.go @@ -367,10 +367,6 @@ type FrameReader struct { frame *StreamFrame frameOffset int - // To handle printing the file events - fileEventOffset int - fileEvent []byte - byteOffset int } @@ -417,23 +413,6 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { } } - if f.frame.FileEvent != "" && len(f.fileEvent) == 0 { - f.fileEvent = []byte(fmt.Sprintf("\nnomad: %q\n", f.frame.FileEvent)) - f.fileEventOffset = 0 - } - - // If there is a file event we inject it into the read stream - if l := len(f.fileEvent); l != 0 && l != f.fileEventOffset { - n = copy(p, f.fileEvent[f.fileEventOffset:]) - f.fileEventOffset += n - return n, nil - } - - if len(f.fileEvent) == f.fileEventOffset { - f.fileEvent = nil - f.fileEventOffset = 0 - } - // Copy the data out of the frame and update our offset n = copy(p, f.frame.Data[f.frameOffset:]) f.frameOffset += n diff --git a/command/fs.go b/command/fs.go index a9d67492a8d9..a118950fe472 100644 --- a/command/fs.go +++ b/command/fs.go @@ -329,9 +329,6 @@ func (f *FSCommand) followFile(client *api.Client, alloc *api.Allocation, // End the streaming r.Close() - - // Output the last offset - f.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() return r, nil diff --git a/command/logs.go b/command/logs.go index d7073f43d338..fbb48f80fef7 100644 --- a/command/logs.go +++ b/command/logs.go @@ -225,9 +225,6 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, // End the streaming r.Close() - - // Output the last offset - l.Ui.Output(fmt.Sprintf("\nLast outputted offset (bytes): %d", frameReader.Offset())) }() return r, nil From efc4700d426f10678f92f60f53f96428f35e9a1a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 08:53:59 -0700 Subject: [PATCH 08/16] fix indents --- command/fs.go | 8 ++++---- command/logs.go | 18 +++++++++--------- command/plan.go | 4 ++-- command/run.go | 18 +++++++++--------- command/stop.go | 6 +++--- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/command/fs.go b/command/fs.go index a118950fe472..66bce5bb659f 100644 --- a/command/fs.go +++ b/command/fs.go @@ -60,12 +60,12 @@ FS Specific Options: rather to wait for additional output. -tail - Show the files contents with offsets relative to the end of the file. If no - offset is given, -n is defaulted to 10. + Show the files contents with offsets relative to the end of the file. If no + offset is given, -n is defaulted to 10. -n - Sets the tail location in best-efforted number of lines relative to the end - of the file. + Sets the tail location in best-efforted number of lines relative to the end + of the file. -c Sets the tail location in number of bytes relative to the end of the file. diff --git a/command/logs.go b/command/logs.go index fbb48f80fef7..e117090212b7 100644 --- a/command/logs.go +++ b/command/logs.go @@ -20,11 +20,11 @@ func (l *LogsCommand) Help() string { helpText := ` Usage: nomad logs [options] -TODO + Streams the stdout/stderr of the given allocation and task. General Options: - ` + generalOptionsUsage() + ` + ` + generalOptionsUsage() + ` Logs Specific Options: @@ -35,21 +35,21 @@ Logs Specific Options: Use a random allocation from a specified job-id. -tail - Show the files contents with offsets relative to the end of the file. If no - offset is given, -n is defaulted to 10. + Show the files contents with offsets relative to the end of the file. If no + offset is given, -n is defaulted to 10. -n - Sets the tail location in best-efforted number of lines relative to the end - of the file. + Sets the tail location in best-efforted number of lines relative to the end + of the file. -c - Sets the tail location in number of bytes relative to the end of the file. -` + Sets the tail location in number of bytes relative to the end of the file. + ` return strings.TrimSpace(helpText) } func (l *LogsCommand) Synopsis() string { - return "Inspect the contents of an allocation directory" + return "Streams the logs of a task." } func (l *LogsCommand) Run(args []string) int { diff --git a/command/plan.go b/command/plan.go index aa85aceb0742..4b404cfd48e6 100644 --- a/command/plan.go +++ b/command/plan.go @@ -63,8 +63,8 @@ General Options: Plan Options: -diff - Determines whether the diff between the remote job and planned job is shown. - Defaults to true. + Determines whether the diff between the remote job and planned job is shown. + Defaults to true. -verbose Increase diff verbosity. diff --git a/command/run.go b/command/run.go index c983750ac846..e0ed219a5a40 100644 --- a/command/run.go +++ b/command/run.go @@ -62,17 +62,17 @@ General Options: Run Options: -check-index - If set, the job is only registered or updated if the the passed - job modify index matches the server side version. If a check-index value of - zero is passed, the job is only registered if it does not yet exist. If a - non-zero value is passed, it ensures that the job is being updated from a - known state. The use of this flag is most common in conjunction with plan - command. + If set, the job is only registered or updated if the the passed + job modify index matches the server side version. If a check-index value of + zero is passed, the job is only registered if it does not yet exist. If a + non-zero value is passed, it ensures that the job is being updated from a + known state. The use of this flag is most common in conjunction with plan + command. -detach - Return immediately instead of entering monitor mode. After job submission, - the evaluation ID will be printed to the screen, which can be used to - examine the evaluation using the eval-status command. + Return immediately instead of entering monitor mode. After job submission, + the evaluation ID will be printed to the screen, which can be used to + examine the evaluation using the eval-status command. -verbose Display full information. diff --git a/command/stop.go b/command/stop.go index 3a1625397c49..1f19baae56af 100644 --- a/command/stop.go +++ b/command/stop.go @@ -27,9 +27,9 @@ Stop Options: -detach Return immediately instead of entering monitor mode. After the - deregister command is submitted, a new evaluation ID is printed to the - screen, which can be used to examine the evaluation using the eval-status - command. + deregister command is submitted, a new evaluation ID is printed to the + screen, which can be used to examine the evaluation using the eval-status + command. -yes Automatic yes to prompts. From fff91cf8f0dd1b15cd8f6c313ce8963e4c381307 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 09:13:48 -0700 Subject: [PATCH 09/16] Look up the tasks automatically --- command/logs.go | 46 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/command/logs.go b/command/logs.go index e117090212b7..1588589934c2 100644 --- a/command/logs.go +++ b/command/logs.go @@ -70,22 +70,16 @@ func (l *LogsCommand) Run(args []string) int { } args = flags.Args() - if len(args) < 2 { + if len(args) < 1 { if job { - l.Ui.Error("job ID and task name required") + l.Ui.Error("Job ID required") } else { - l.Ui.Error("allocation ID and task name required") + l.Ui.Error("Allocation ID required") } return 1 } - task := args[1] - if task == "" { - l.Ui.Error("task name required") - return 1 - } - client, err := l.Meta.Client() if err != nil { l.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) @@ -151,6 +145,39 @@ func (l *LogsCommand) Run(args []string) int { return 1 } + var task string + if len(args) >= 2 { + task = args[1] + if task == "" { + l.Ui.Error("Task name required") + return 1 + } + + } else { + // Try to determine the tasks name from the allocation + var tasks []*api.Task + for _, tg := range alloc.Job.TaskGroups { + if tg.Name == alloc.TaskGroup { + if len(tg.Tasks) == 1 { + task = tg.Tasks[0].Name + break + } + + tasks = tg.Tasks + break + } + } + + if task == "" { + l.Ui.Error(fmt.Sprintf("Allocation %q is running the following tasks:", limit(alloc.ID, length))) + for _, t := range tasks { + l.Ui.Error(fmt.Sprintf(" * %s", t.Name)) + } + l.Ui.Error("\nPlease specify the task.") + return 1 + } + } + logType := "stdout" if stderr { logType = "stderr" @@ -209,6 +236,7 @@ func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, cancel := make(chan struct{}) frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil) if err != nil { + panic(err.Error()) return nil, err } signalCh := make(chan os.Signal, 1) From b65fd2624e0f872636b8cccdebfc2ba007341884 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 10:18:05 -0700 Subject: [PATCH 10/16] Support non-following logs --- api/fs.go | 8 ++++-- command/agent/fs_endpoint.go | 56 ++++++++++++++++++++++++++++++------ command/logs.go | 15 ++++++---- 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/api/fs.go b/api/fs.go index 42ab3e579620..70635302847f 100644 --- a/api/fs.go +++ b/api/fs.go @@ -279,15 +279,16 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, // Logs streams the content of a tasks logs blocking on EOF. // The parameters are: // * allocation: the allocation to stream from. +// * follow: Whether the logs should be followed. // * task: the tasks name to stream logs for. // * logType: Either "stdout" or "stderr" -// * offset: The offset to start streaming data at. // * origin: Either "start" or "end" and defines from where the offset is applied. +// * offset: The offset to start streaming data at. // * cancel: A channel that when closed, streaming will end. // // The return value is a channel that will emit StreamFrames as they are read. -func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset int64, - cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) { +func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, + offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, *QueryMeta, error) { node, _, err := a.client.Nodes().Info(alloc.NodeID, q) if err != nil { @@ -303,6 +304,7 @@ func (a *AllocFS) Logs(alloc *Allocation, task, logType, origin string, offset i Path: fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID), } v := url.Values{} + v.Set("follow", strconv.FormatBool(follow)) v.Set("task", task) v.Set("type", logType) v.Set("origin", origin) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 225530561ccf..2eba9f456b6d 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -350,17 +350,16 @@ OUTER: } // Flush any existing frames - s.l.Lock() select { case o := <-s.outbound: // Send the frame and then clear the current working frame if err = s.enc.Encode(o); err != nil { - s.l.Unlock() return } default: } + s.l.Lock() if s.f != nil { s.f.Data = s.readData() s.enc.Encode(s.f) @@ -622,7 +621,11 @@ OUTER: continue OUTER case <-framer.ExitCh(): return nil - case err := <-eofCancelCh: + case err, ok := <-eofCancelCh: + if !ok { + return nil + } + return err } } @@ -634,11 +637,13 @@ OUTER: // Logs streams the content of a log blocking on EOF. The parameters are: // * task: task name to stream logs for. // * type: stdout/stderr to stream. +// * follow: A boolean of whether to follow the logs. // * offset: The offset to start streaming data at, defaults to zero. // * origin: Either "start" or "end" and defines from where the offset is // applied. Defaults to "start". func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var allocID, task, logType string + var follow bool var err error q := req.URL.Query() @@ -651,6 +656,10 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac return nil, taskNotPresentErr } + if follow, err = strconv.ParseBool(q.Get("follow")); err != nil { + return nil, fmt.Errorf("Failed to parse follow field to boolean: %v", err) + } + logType = q.Get("type") switch logType { case "stdout", "stderr": @@ -684,10 +693,13 @@ func (s *HTTPServer) Logs(resp http.ResponseWriter, req *http.Request) (interfac // Create an output that gets flushed on every write output := ioutils.NewWriteFlusher(resp) - return nil, s.logs(offset, origin, task, logType, fs, output) + return nil, s.logs(follow, offset, origin, task, logType, fs, output) } -func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdir.AllocDirFS, output io.WriteCloser) error { +func (s *HTTPServer) logs(follow bool, offset int64, + origin, task, logType string, + fs allocdir.AllocDirFS, output io.WriteCloser) error { + // Create the framer framer := NewStreamFramer(output, streamHeartbeatRate, streamBatchWindow, streamFrameSize) framer.Run() @@ -727,15 +739,39 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return fmt.Errorf("failed to list entries: %v", err) } + // If we are not following logs, determine the max index for the logs we are + // interested in so we can stop there. + maxIndex := int64(math.MaxInt64) + if !follow { + _, idx, _, err := findClosest(entries, maxIndex, 0, task, logType) + if err != nil { + return err + } + maxIndex = idx + } + logEntry, idx, openOffset, err := findClosest(entries, nextIdx, offset, task, logType) if err != nil { return err } + var eofCancelCh chan error + exitAfter := false + if !follow && idx > maxIndex { + // Exceeded what was there initially so return + return nil + } else if !follow && idx == maxIndex { + // At the end + eofCancelCh = make(chan error) + close(eofCancelCh) + exitAfter = true + } else { + nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) + eofCancelCh = fs.BlockUntilExists(nextPath, &t) + } + p := filepath.Join(logPath, logEntry.Name) - nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) - nextExists := fs.BlockUntilExists(nextPath, &t) - err = s.stream(openOffset, p, fs, framer, nextExists) + err = s.stream(openOffset, p, fs, framer, eofCancelCh) // Check if there was an error where the file does not exist. That means // it got rotated out from under us. @@ -746,6 +782,10 @@ func (s *HTTPServer) logs(offset int64, origin, task, logType string, fs allocdi return err } + if exitAfter { + return nil + } + //Since we successfully streamed, update the overall offset/idx. offset = int64(0) nextIdx = idx + 1 diff --git a/command/logs.go b/command/logs.go index 1588589934c2..e5aa6297a400 100644 --- a/command/logs.go +++ b/command/logs.go @@ -34,6 +34,10 @@ Logs Specific Options: -job Use a random allocation from a specified job-id. + -f + Causes the output to not stop when the end of the logs are reached, but + rather to wait for additional output. + -tail Show the files contents with offsets relative to the end of the file. If no offset is given, -n is defaulted to 10. @@ -53,7 +57,7 @@ func (l *LogsCommand) Synopsis() string { } func (l *LogsCommand) Run(args []string) int { - var verbose, job, tail, stderr bool + var verbose, job, tail, stderr, follow bool var numLines, numBytes int64 flags := l.Meta.FlagSet("logs-list", FlagSetClient) @@ -61,6 +65,7 @@ func (l *LogsCommand) Run(args []string) int { flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&job, "job", false, "") flags.BoolVar(&tail, "tail", false, "") + flags.BoolVar(&follow, "f", false, "") flags.BoolVar(&stderr, "stderr", false, "") flags.Int64Var(&numLines, "n", -1, "") flags.Int64Var(&numBytes, "c", -1, "") @@ -187,7 +192,7 @@ func (l *LogsCommand) Run(args []string) int { var r io.ReadCloser var readErr error if !tail { - r, readErr = l.followFile(client, alloc, task, logType, api.OriginStart, 0) + r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginStart, 0) if readErr != nil { readErr = fmt.Errorf("Error reading file: %v", readErr) } @@ -206,7 +211,7 @@ func (l *LogsCommand) Run(args []string) int { numLines = defaultTailLines } - r, readErr = l.followFile(client, alloc, task, logType, api.OriginEnd, offset) + r, readErr = l.followFile(client, alloc, follow, task, logType, api.OriginEnd, offset) // If numLines is set, wrap the reader if numLines != -1 { @@ -231,10 +236,10 @@ func (l *LogsCommand) Run(args []string) int { // followFile outputs the contents of the file to stdout relative to the end of // the file. func (l *LogsCommand) followFile(client *api.Client, alloc *api.Allocation, - task, logType, origin string, offset int64) (io.ReadCloser, error) { + follow bool, task, logType, origin string, offset int64) (io.ReadCloser, error) { cancel := make(chan struct{}) - frames, _, err := client.AllocFS().Logs(alloc, task, logType, origin, offset, cancel, nil) + frames, _, err := client.AllocFS().Logs(alloc, follow, task, logType, origin, offset, cancel, nil) if err != nil { panic(err.Error()) return nil, err From f22e228e8216ee59052464847c53ff16d8869598 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 13:06:05 -0700 Subject: [PATCH 11/16] log tests --- command/agent/fs_endpoint.go | 27 ++++- command/agent/fs_endpoint_test.go | 189 ++++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+), 3 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 2eba9f456b6d..bc24de702b13 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "syscall" "time" "gopkg.in/tomb.v1" @@ -43,8 +44,16 @@ const ( // being flushed if the frame size has not been hit. streamBatchWindow = 200 * time.Millisecond + // deleteEvent and truncateEvent are the file events that can be sent in a + // StreamFrame deleteEvent = "file deleted" truncateEvent = "file truncated" + + // OriginStart and OriginEnd are the available parameters for the origin + // argument when streaming a file. They respectively offset from the start + // and end of a file. + OriginStart = "start" + OriginEnd = "end" ) func (s *HTTPServer) FsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -573,6 +582,12 @@ OUTER: // Send the frame if n != 0 { if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { + + // Check if the connection has been closed + if err == io.ErrClosedPipe || strings.Contains(err.Error(), syscall.EPIPE.Error()) { + return syscall.EPIPE + } + return err } } @@ -773,13 +788,19 @@ func (s *HTTPServer) logs(follow bool, offset int64, p := filepath.Join(logPath, logEntry.Name) err = s.stream(openOffset, p, fs, framer, eofCancelCh) - // Check if there was an error where the file does not exist. That means - // it got rotated out from under us. if err != nil { + // Check if there was an error where the file does not exist. That means + // it got rotated out from under us. if os.IsNotExist(err) { continue } - return err + + // Check if the connection was closed + if err == syscall.EPIPE { + return nil + } + + return fmt.Errorf("failed to stream %q: %v", p, err) } if exitAfter { diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index ced3708af87d..519687fc3e62 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -2,6 +2,7 @@ package agent import ( "bytes" + "fmt" "io" "io/ioutil" "math" @@ -12,6 +13,7 @@ import ( "reflect" "strconv" "strings" + "syscall" "testing" "time" @@ -452,6 +454,10 @@ func tempAllocDir(t *testing.T) *allocdir.AllocDir { t.Fatalf("TempDir() failed: %v", err) } + if err := os.Chmod(dir, 0777); err != nil { + t.Fatalf("failed to chmod dir: %v", err) + } + return allocdir.NewAllocDir(dir) } @@ -745,6 +751,189 @@ func TestHTTP_Stream_Delete(t *testing.T) { }) } +func TestHTTP_Logs_NoFollow(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := []byte("012") + for i := 0; i < 3; i++ { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, i) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, expected[i:i+1], 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + + // Create a decoder + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + var received []byte + + // Start the reader + resultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + if err == io.EOF { + t.Logf("EOF") + return + } + + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if reflect.DeepEqual(received, expected) { + close(resultCh) + return + } + } + }() + + // Start streaming logs + go func() { + if err := s.Server.logs(false, 0, OriginStart, task, logType, ad, wrappedW); err != nil { + t.Fatalf("logs() failed: %v", err) + } + }() + + select { + case <-resultCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + + testutil.WaitForResult(func() (bool, error) { + return wrappedW.Closed, nil + }, func(err error) { + t.Fatalf("connection not closed") + }) + + }) +} + +func TestHTTP_Logs_Follow(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Get a temp alloc dir and create the log dir + ad := tempAllocDir(t) + defer os.RemoveAll(ad.AllocDir) + + logDir := filepath.Join(ad.SharedDir, allocdir.LogDirName) + if err := os.MkdirAll(logDir, 0777); err != nil { + t.Fatalf("Failed to make log dir: %v", err) + } + + // Create a series of log files in the temp dir + task := "foo" + logType := "stdout" + expected := []byte("012345") + initialWrites := 3 + + writeToFile := func(index int, data []byte) { + logFile := fmt.Sprintf("%s.%s.%d", task, logType, index) + logFilePath := filepath.Join(logDir, logFile) + err := ioutil.WriteFile(logFilePath, data, 777) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + } + for i := 0; i < initialWrites; i++ { + writeToFile(i, expected[i:i+1]) + } + + // Create a decoder + r, w := io.Pipe() + wrappedW := &WriteCloseChecker{WriteCloser: w} + defer r.Close() + defer w.Close() + dec := codec.NewDecoder(r, jsonHandle) + + var received []byte + + // Start the reader + firstResultCh := make(chan struct{}) + fullResultCh := make(chan struct{}) + go func() { + for { + var frame StreamFrame + if err := dec.Decode(&frame); err != nil { + if err == io.EOF { + t.Logf("EOF") + return + } + + t.Fatalf("failed to decode: %v", err) + } + + if frame.IsHeartbeat() { + continue + } + + received = append(received, frame.Data...) + if reflect.DeepEqual(received, expected[:initialWrites]) { + close(firstResultCh) + } else if reflect.DeepEqual(received, expected) { + close(fullResultCh) + return + } + } + }() + + // Start streaming logs + go func() { + if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil && err != syscall.EPIPE { + t.Fatalf("logs() failed: %v", err) + } + }() + + select { + case <-firstResultCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + + // We got the first chunk of data, write out the rest to the file to + // check that it is following + writeToFile(initialWrites, expected[initialWrites:]) + + select { + case <-fullResultCh: + case <-time.After(4 * streamBatchWindow): + t.Fatalf("did not receive data: got %q", string(received)) + } + + // Close the reader + r.Close() + + testutil.WaitForResult(func() (bool, error) { + return wrappedW.Closed, nil + }, func(err error) { + t.Fatalf("connection not closed") + }) + + }) +} + func TestLogs_findClosest(t *testing.T) { task := "foo" entries := []*allocdir.AllocFileInfo{ From f3a71e01437f6d1e6217e060054176544f5e862e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 14:14:54 -0700 Subject: [PATCH 12/16] Handle skipping indexes --- command/agent/fs_endpoint.go | 84 +++++++++++++++++++++++++++---- command/agent/fs_endpoint_test.go | 12 ++--- 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index bc24de702b13..ab50481072d6 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -44,6 +44,12 @@ const ( // being flushed if the frame size has not been hit. streamBatchWindow = 200 * time.Millisecond + // nextLogCheckRate is the rate at which we check for a log entry greater + // than what we are watching for. This is to handle the case in which logs + // rotate faster than we can detect and we have to rely on a normal + // directory listing. + nextLogCheckRate = 100 * time.Millisecond + // deleteEvent and truncateEvent are the file events that can be sent in a // StreamFrame deleteEvent = "file deleted" @@ -781,8 +787,7 @@ func (s *HTTPServer) logs(follow bool, offset int64, close(eofCancelCh) exitAfter = true } else { - nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, idx+1)) - eofCancelCh = fs.BlockUntilExists(nextPath, &t) + eofCancelCh = blockUntilNextLog(fs, &t, logPath, task, logType, idx+1) } p := filepath.Join(logPath, logEntry.Name) @@ -815,6 +820,52 @@ func (s *HTTPServer) logs(follow bool, offset int64, return nil } +// blockUntilNextLog returns a channel that will have data sent when the next +// log index or anything greater is created. +func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logType string, nextIndex int64) chan error { + nextPath := filepath.Join(logPath, fmt.Sprintf("%s.%s.%d", task, logType, nextIndex)) + next := make(chan error, 1) + + go func() { + eofCancelCh := fs.BlockUntilExists(nextPath, t) + scanCh := time.Tick(nextLogCheckRate) + for { + select { + case err := <-eofCancelCh: + next <- err + close(next) + return + case <-scanCh: + entries, err := fs.List(logPath) + if err != nil { + next <- fmt.Errorf("failed to list entries: %v", err) + close(next) + return + } + + indexes, err := logIndexes(entries, task, logType) + if err != nil { + next <- err + close(next) + return + } + + // Scan and see if there are any entries larger than what we are + // waiting for. + for _, entry := range indexes { + if entry.idx >= nextIndex { + next <- nil + close(next) + return + } + } + } + } + }() + + return next +} + // indexTuple and indexTupleArray are used to find the correct log entry to // start streaming logs from type indexTuple struct { @@ -828,14 +879,10 @@ func (a indexTupleArray) Len() int { return len(a) } func (a indexTupleArray) Less(i, j int) bool { return a[i].idx < a[j].idx } func (a indexTupleArray) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// findClosest takes a list of entries, the desired log index and desired log -// offset (which can be negative, treated as offset from end), task name and log -// type and returns the log entry, the log index, the offset to read from and a -// potential error. -func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64, - task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) { - - // Build the matching indexes +// logIndexes takes a set of entries and returns a indexTupleArray of +// the desired log file entries. If the indexes could not be determined, an +// error is returned. +func logIndexes(entries []*allocdir.AllocFileInfo, task, logType string) (indexTupleArray, error) { var indexes []indexTuple prefix := fmt.Sprintf("%s.%s.", task, logType) for _, entry := range entries { @@ -852,12 +899,27 @@ func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset in // Convert to an int idx, err := strconv.Atoi(idxStr) if err != nil { - return nil, 0, 0, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) + return nil, fmt.Errorf("failed to convert %q to a log index: %v", idxStr, err) } indexes = append(indexes, indexTuple{idx: int64(idx), entry: entry}) } + return indexTupleArray(indexes), nil +} + +// findClosest takes a list of entries, the desired log index and desired log +// offset (which can be negative, treated as offset from end), task name and log +// type and returns the log entry, the log index, the offset to read from and a +// potential error. +func findClosest(entries []*allocdir.AllocFileInfo, desiredIdx, desiredOffset int64, + task, logType string) (*allocdir.AllocFileInfo, int64, int64, error) { + + // Build the matching indexes + indexes, err := logIndexes(entries, task, logType) + if err != nil { + return nil, 0, 0, err + } if len(indexes) == 0 { return nil, 0, 0, fmt.Errorf("log entry for task %q and log type %q not found", task, logType) } diff --git a/command/agent/fs_endpoint_test.go b/command/agent/fs_endpoint_test.go index 519687fc3e62..2659bb88cef2 100644 --- a/command/agent/fs_endpoint_test.go +++ b/command/agent/fs_endpoint_test.go @@ -13,7 +13,6 @@ import ( "reflect" "strconv" "strings" - "syscall" "testing" "time" @@ -901,7 +900,7 @@ func TestHTTP_Logs_Follow(t *testing.T) { // Start streaming logs go func() { - if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil && err != syscall.EPIPE { + if err := s.Server.logs(true, 0, OriginStart, task, logType, ad, wrappedW); err != nil { t.Fatalf("logs() failed: %v", err) } }() @@ -912,9 +911,11 @@ func TestHTTP_Logs_Follow(t *testing.T) { t.Fatalf("did not receive data: got %q", string(received)) } - // We got the first chunk of data, write out the rest to the file to - // check that it is following - writeToFile(initialWrites, expected[initialWrites:]) + // We got the first chunk of data, write out the rest to the next file + // at an index much ahead to check that it is following and detecting + // skips + skipTo := initialWrites + 10 + writeToFile(skipTo, expected[initialWrites:]) select { case <-fullResultCh: @@ -930,7 +931,6 @@ func TestHTTP_Logs_Follow(t *testing.T) { }, func(err error) { t.Fatalf("connection not closed") }) - }) } From 2a32e839acde7b9f76183d6f524c63495d663e8f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 20 Jul 2016 15:18:54 -0700 Subject: [PATCH 13/16] Add logs command test --- command/agent/fs_endpoint.go | 12 ++++--- command/fs.go | 2 +- command/logs.go | 16 +++++---- command/logs_test.go | 65 ++++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 11 deletions(-) create mode 100644 command/logs_test.go diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index ab50481072d6..ccc552181127 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -311,10 +311,11 @@ func (s *StreamFramer) run() { // Store any error and mark it as not running var err error defer func() { - s.l.Lock() - s.Err = err close(s.exitCh) close(s.outbound) + + s.l.Lock() + s.Err = err s.running = false s.l.Unlock() }() @@ -338,8 +339,11 @@ func (s *StreamFramer) run() { // Read the data for the frame, and send it s.f.Data = s.readData() - s.outbound <- s.f - s.f = nil + select { + case s.outbound <- s.f: + s.f = nil + default: + } s.l.Unlock() case <-s.heartbeat.C: // Send a heartbeat frame diff --git a/command/fs.go b/command/fs.go index 66bce5bb659f..df1bbf91d08f 100644 --- a/command/fs.go +++ b/command/fs.go @@ -50,7 +50,7 @@ FS Specific Options: Show full information. -job - Use a random allocation from a specified job-id. + Use a random allocation from the specified job ID. -stat Show file stat information instead of displaying the file, or listing the directory. diff --git a/command/logs.go b/command/logs.go index e5aa6297a400..6f105202c0f5 100644 --- a/command/logs.go +++ b/command/logs.go @@ -24,7 +24,7 @@ Usage: nomad logs [options] General Options: - ` + generalOptionsUsage() + ` + ` + generalOptionsUsage() + ` Logs Specific Options: @@ -32,7 +32,7 @@ Logs Specific Options: Show full information. -job - Use a random allocation from a specified job-id. + Use a random allocation from the specified job ID. -f Causes the output to not stop when the end of the logs are reached, but @@ -60,7 +60,7 @@ func (l *LogsCommand) Run(args []string) int { var verbose, job, tail, stderr, follow bool var numLines, numBytes int64 - flags := l.Meta.FlagSet("logs-list", FlagSetClient) + flags := l.Meta.FlagSet("logs", FlagSetClient) flags.Usage = func() { l.Ui.Output(l.Help()) } flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&job, "job", false, "") @@ -75,13 +75,17 @@ func (l *LogsCommand) Run(args []string) int { } args = flags.Args() - if len(args) < 1 { + if numArgs := len(args); numArgs < 1 { if job { - l.Ui.Error("Job ID required") + l.Ui.Error("Job ID required. See help:\n") } else { - l.Ui.Error("Allocation ID required") + l.Ui.Error("Allocation ID required. See help:\n") } + l.Ui.Error(l.Help()) + return 1 + } else if numArgs > 2 { + l.Ui.Error(l.Help()) return 1 } diff --git a/command/logs_test.go b/command/logs_test.go new file mode 100644 index 000000000000..2400b6302d0b --- /dev/null +++ b/command/logs_test.go @@ -0,0 +1,65 @@ +package command + +import ( + "strings" + "testing" + + "github.com/mitchellh/cli" +) + +func TestLogsCommand_Implements(t *testing.T) { + var _ cli.Command = &LogsCommand{} +} + +func TestLogsCommand_Fails(t *testing.T) { + srv, _, url := testServer(t, nil) + defer srv.Stop() + + ui := new(cli.MockUi) + cmd := &LogsCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + if code := cmd.Run([]string{"some", "bad", "args"}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, cmd.Help()) { + t.Fatalf("expected help output, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Fails on connection failure + if code := cmd.Run([]string{"-address=nope", "foobar"}); code != 1 { + t.Fatalf("expected exit code 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error querying allocation") { + t.Fatalf("expected failed query error, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Fails on missing alloc + if code := cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}); code != 1 { + t.Fatalf("expected exit 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "No allocation(s) with prefix or id") { + t.Fatalf("expected not found error, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Fail on identifier with too few characters + if code := cmd.Run([]string{"-address=" + url, "2"}); code != 1 { + t.Fatalf("expected exit 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "must contain at least two characters.") { + t.Fatalf("expected too few characters error, got: %s", out) + } + ui.ErrorWriter.Reset() + + // Identifiers with uneven length should produce a query result + if code := cmd.Run([]string{"-address=" + url, "123"}); code != 1 { + t.Fatalf("expected exit 1, got: %d", code) + } + if out := ui.ErrorWriter.String(); !strings.Contains(out, "No allocation(s) with prefix or id") { + t.Fatalf("expected not found error, got: %s", out) + } + +} From cbcb32096fb0321a29b6c03ef81c255389ec7a6e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 22 Jul 2016 15:01:56 -0700 Subject: [PATCH 14/16] small fixes --- command/agent/fs_endpoint.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index ccc552181127..0c811d8c9c83 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -546,9 +546,17 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf framer.Run() defer framer.Destroy() - return nil, s.stream(offset, path, fs, framer, nil) + err := s.stream(offset, path, fs, framer, nil) + if err != nil && err != syscall.EPIPE { + return nil, err + } + + return nil, nil } +// stream is the internal method to stream the content of a file. eofCancelCh is +// used to cancel the stream if triggered while at EOF. If the connection is +// broken an EPIPE error is returned func (s *HTTPServer) stream(offset int64, path string, fs allocdir.AllocDirFS, framer *StreamFramer, eofCancelCh chan error) error { @@ -805,7 +813,7 @@ func (s *HTTPServer) logs(follow bool, offset int64, } // Check if the connection was closed - if err == syscall.EPIPE { + if strings.Contains(err.Error(), syscall.EPIPE.Error()) { return nil } From 67fe93434214fb9361a4639ef2fbdb20f912dd21 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 22 Jul 2016 15:07:11 -0700 Subject: [PATCH 15/16] control-c works --- api/fs.go | 16 +++++++++++++++- command/agent/fs_endpoint.go | 2 +- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/api/fs.go b/api/fs.go index 70635302847f..dfb6b0f43ac5 100644 --- a/api/fs.go +++ b/api/fs.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strconv" + "sync" "time" ) @@ -362,7 +363,9 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str type FrameReader struct { frames <-chan *StreamFrame cancelCh chan struct{} - closed bool + + closedLock sync.Mutex + closed bool unblockTime time.Duration @@ -395,6 +398,13 @@ func (f *FrameReader) Offset() int { // Read reads the data of the incoming frames into the bytes buffer. Returns EOF // when there are no more frames. func (f *FrameReader) Read(p []byte) (n int, err error) { + f.closedLock.Lock() + closed := f.closed + f.closedLock.Unlock() + if closed { + return 0, io.EOF + } + if f.frame == nil { var unblock <-chan time.Time if f.unblockTime.Nanoseconds() > 0 { @@ -412,6 +422,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { f.byteOffset = int(f.frame.Offset) case <-unblock: return 0, nil + case <-f.cancelCh: + return 0, io.EOF } } @@ -430,6 +442,8 @@ func (f *FrameReader) Read(p []byte) (n int, err error) { // Close cancels the stream of frames func (f *FrameReader) Close() error { + f.closedLock.Lock() + defer f.closedLock.Unlock() if f.closed { return nil } diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index 0c811d8c9c83..f0f7dd0abaa4 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -546,7 +546,7 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf framer.Run() defer framer.Destroy() - err := s.stream(offset, path, fs, framer, nil) + err = s.stream(offset, path, fs, framer, nil) if err != nil && err != syscall.EPIPE { return nil, err } From 0b9449fe7e653ff8d922b972e6e2ef94582a4f8f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 22 Jul 2016 16:02:46 -0700 Subject: [PATCH 16/16] better flush and connection closed handling --- command/agent/fs_endpoint.go | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/command/agent/fs_endpoint.go b/command/agent/fs_endpoint.go index f0f7dd0abaa4..3a1ca361a972 100644 --- a/command/agent/fs_endpoint.go +++ b/command/agent/fs_endpoint.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "math" + "net" "net/http" "os" "path/filepath" @@ -312,9 +313,9 @@ func (s *StreamFramer) run() { var err error defer func() { close(s.exitCh) - close(s.outbound) s.l.Lock() + close(s.outbound) s.Err = err s.running = false s.l.Unlock() @@ -342,7 +343,7 @@ func (s *StreamFramer) run() { select { case s.outbound <- s.f: s.f = nil - default: + case <-s.exitCh: } s.l.Unlock() case <-s.heartbeat.C: @@ -369,13 +370,17 @@ OUTER: } // Flush any existing frames - select { - case o := <-s.outbound: - // Send the frame and then clear the current working frame - if err = s.enc.Encode(o); err != nil { - return +FLUSH: + for { + select { + case o := <-s.outbound: + // Send the frame and then clear the current working frame + if err = s.enc.Encode(o); err != nil { + return + } + default: + break FLUSH } - default: } s.l.Lock() @@ -602,10 +607,20 @@ OUTER: if err := framer.Send(path, lastEvent, data[:n], offset); err != nil { // Check if the connection has been closed - if err == io.ErrClosedPipe || strings.Contains(err.Error(), syscall.EPIPE.Error()) { + if err == io.ErrClosedPipe { + // The pipe check is for tests return syscall.EPIPE } + operr, ok := err.(*net.OpError) + if ok { + // The connection was closed by our peer + e := operr.Err.Error() + if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) { + return syscall.EPIPE + } + } + return err } } @@ -813,7 +828,7 @@ func (s *HTTPServer) logs(follow bool, offset int64, } // Check if the connection was closed - if strings.Contains(err.Error(), syscall.EPIPE.Error()) { + if err == syscall.EPIPE { return nil }