Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix high CPU and FD usage on nomad logs #2079

Merged
merged 5 commits into from
Dec 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ IMPROVEMENTS:

BUG FIXES:
* agent/config: Fix use of IPv6 addresses [GH-2036]
* api: Fix file descriptor leak and high CPU usage when using the logs
endpoint [GH-2079]
* cli: Improve parsing error when a job without a name is specified [GH-2030]
* client: Fixed permissions of migrated allocation directory [GH-2061]
* client: Ensuring allocations are not blocked more than once [GH-2040]
Expand Down
153 changes: 86 additions & 67 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package agent

//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go

import (
"bytes"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -204,6 +205,12 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
return nil, r.Close()
}

var (
// HeartbeatStreamFrame is the StreamFrame to send as a heartbeat, avoiding
// creating many instances of the empty StreamFrame
HeartbeatStreamFrame = &StreamFrame{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment

)

// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
// Offset is the offset the data was read from
Expand All @@ -225,6 +232,27 @@ func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
}

func (s *StreamFrame) Clear() {
s.Offset = 0
s.Data = nil
s.File = ""
s.FileEvent = ""
}

func (s *StreamFrame) IsCleared() bool {
if s.Offset != 0 {
return false
} else if s.Data != nil {
return false
} else if s.File != "" {
return false
} else if s.FileEvent != "" {
return false
} else {
return true
}
}

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
Expand All @@ -243,7 +271,7 @@ type StreamFramer struct {
l sync.Mutex

// The current working frame
f *StreamFrame
f StreamFrame
data *bytes.Buffer

// Captures whether the framer is running and any error that occurred to
Expand Down Expand Up @@ -328,32 +356,32 @@ OUTER:
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
if s.f.IsCleared() {
s.l.Unlock()
continue
}

// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
err = s.send(&s.f)
s.f.Clear()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(&StreamFrame{}); err != nil {
if err = s.send(HeartbeatStreamFrame); err != nil {
return
}
}
}

s.l.Lock()
if s.f != nil {
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
err = s.send(&s.f)
s.f.Clear()
}
s.l.Unlock()
}
Expand All @@ -378,9 +406,7 @@ func (s *StreamFramer) readData() []byte {
return nil
}
d := s.data.Next(size)
b := make([]byte, size)
copy(b, d)
return b
return d
}

// Send creates and sends a StreamFrame based on the passed parameters. An error
Expand All @@ -401,75 +427,62 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
}

// Check if not mergeable
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
f := *s.f
f.Data = s.readData()
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&f)
s.f = nil
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
}

// Store the new data as the current frame.
if s.f == nil {
s.f = &StreamFrame{
Offset: offset,
File: file,
FileEvent: fileEvent,
}
if s.f.IsCleared() {
s.f.Offset = offset
s.f.File = file
s.f.FileEvent = fileEvent
}

// Write the data to the buffer
s.data.Write(data)

// Handle the delete case in which there is no data
force := false
if s.data.Len() == 0 && s.f.FileEvent != "" {
select {
case <-s.exitCh:
return nil
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}
if err := s.send(f); err != nil {
return err
}
force = true
}

// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize {
for s.data.Len() >= s.frameSize || force {
// Clear
if force {
force = false
}

// Create a new frame to send it
d := s.readData()
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: d,
}
if err := s.send(f); err != nil {
if err := s.send(&s.f); err != nil {
return err
}

// Update the offset
s.f.Offset += int64(len(s.f.Data))
}

if s.data.Len() == 0 {
s.f = nil
s.f.Clear()
}

return nil
Expand Down Expand Up @@ -569,6 +582,26 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done()
}()

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
parseFramerErr := func(e error) error {
if e == nil {
return nil
}

if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

return err
}

// Create a variable to allow setting the last event
var lastEvent string

Expand All @@ -594,23 +627,7 @@ OUTER:
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {

// Check if the connection has been closed
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}

operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}

return err
return parseFramerErr(err)
}
}

Expand All @@ -637,7 +654,7 @@ OUTER:
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
return framer.Send(path, deleteEvent, nil, offset)
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
case <-changes.Truncated:
// Close the current reader
if err := f.Close(); err != nil {
Expand All @@ -657,7 +674,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return nil
return parseFramerErr(framer.Err)
case err, ok := <-eofCancelCh:
if !ok {
return nil
Expand Down Expand Up @@ -850,7 +867,9 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
return
}

scanCh := time.Tick(nextLogCheckRate)
ticker := time.NewTicker(nextLogCheckRate)
defer ticker.Stop()
scanCh := ticker.C
for {
select {
case <-t.Dead():
Expand Down
Loading