Skip to content

Commit

Permalink
Merge pull request #2079 from hashicorp/b-logs
Browse files Browse the repository at this point in the history
Fix high CPU and FD usage on `nomad logs`
  • Loading branch information
dadgar committed Dec 10, 2016
2 parents ae2b0f5 + b862dd6 commit d9cb132
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 70 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ IMPROVEMENTS:

BUG FIXES:
* agent/config: Fix use of IPv6 addresses [GH-2036]
* api: Fix file descriptor leak and high CPU usage when using the logs
endpoint [GH-2079]
* cli: Improve parsing error when a job without a name is specified [GH-2030]
* client: Fixed permissions of migrated allocation directory [GH-2061]
* client: Ensuring allocations are not blocked more than once [GH-2040]
Expand Down
153 changes: 86 additions & 67 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package agent

//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go

import (
"bytes"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -204,6 +205,12 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
return nil, r.Close()
}

var (
// HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
// creating many instances of the empty StreamFrame
HeartbeatStreamFrame = &StreamFrame{}
)

// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
// Offset is the offset the data was read from
Expand All @@ -225,6 +232,27 @@ func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
}

func (s *StreamFrame) Clear() {
s.Offset = 0
s.Data = nil
s.File = ""
s.FileEvent = ""
}

func (s *StreamFrame) IsCleared() bool {
if s.Offset != 0 {
return false
} else if s.Data != nil {
return false
} else if s.File != "" {
return false
} else if s.FileEvent != "" {
return false
} else {
return true
}
}

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
Expand All @@ -243,7 +271,7 @@ type StreamFramer struct {
l sync.Mutex

// The current working frame
f *StreamFrame
f StreamFrame
data *bytes.Buffer

// Captures whether the framer is running and any error that occurred to
Expand Down Expand Up @@ -328,32 +356,32 @@ OUTER:
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
if s.f.IsCleared() {
s.l.Unlock()
continue
}

// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
err = s.send(&s.f)
s.f.Clear()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(&StreamFrame{}); err != nil {
if err = s.send(HeartbeatStreamFrame); err != nil {
return
}
}
}

s.l.Lock()
if s.f != nil {
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
err = s.send(&s.f)
s.f.Clear()
}
s.l.Unlock()
}
Expand All @@ -378,9 +406,7 @@ func (s *StreamFramer) readData() []byte {
return nil
}
d := s.data.Next(size)
b := make([]byte, size)
copy(b, d)
return b
return d
}

// Send creates and sends a StreamFrame based on the passed parameters. An error
Expand All @@ -401,75 +427,62 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
}

// Check if not mergeable
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
f := *s.f
f.Data = s.readData()
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&f)
s.f = nil
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
}

// Store the new data as the current frame.
if s.f == nil {
s.f = &StreamFrame{
Offset: offset,
File: file,
FileEvent: fileEvent,
}
if s.f.IsCleared() {
s.f.Offset = offset
s.f.File = file
s.f.FileEvent = fileEvent
}

// Write the data to the buffer
s.data.Write(data)

// Handle the delete case in which there is no data
force := false
if s.data.Len() == 0 && s.f.FileEvent != "" {
select {
case <-s.exitCh:
return nil
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}
if err := s.send(f); err != nil {
return err
}
force = true
}

// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize {
for s.data.Len() >= s.frameSize || force {
// Clear
if force {
force = false
}

// Create a new frame to send it
d := s.readData()
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: d,
}
if err := s.send(f); err != nil {
if err := s.send(&s.f); err != nil {
return err
}

// Update the offset
s.f.Offset += int64(len(s.f.Data))
}

if s.data.Len() == 0 {
s.f = nil
s.f.Clear()
}

return nil
Expand Down Expand Up @@ -569,6 +582,26 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done()
}()

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
parseFramerErr := func(e error) error {
if e == nil {
return nil
}

if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

return err
}

// Create a variable to allow setting the last event
var lastEvent string

Expand All @@ -594,23 +627,7 @@ OUTER:
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {

// Check if the connection has been closed
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}

operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}

return err
return parseFramerErr(err)
}
}

Expand All @@ -637,7 +654,7 @@ OUTER:
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
return framer.Send(path, deleteEvent, nil, offset)
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
case <-changes.Truncated:
// Close the current reader
if err := f.Close(); err != nil {
Expand All @@ -657,7 +674,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return nil
return parseFramerErr(framer.Err)
case err, ok := <-eofCancelCh:
if !ok {
return nil
Expand Down Expand Up @@ -850,7 +867,9 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
return
}

scanCh := time.Tick(nextLogCheckRate)
ticker := time.NewTicker(nextLogCheckRate)
defer ticker.Stop()
scanCh := ticker.C
for {
select {
case <-t.Dead():
Expand Down
Loading

0 comments on commit d9cb132

Please sign in to comment.