Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix high CPU and FD usage on nomad logs #2079

Merged
merged 5 commits into from
Dec 10, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 84 additions & 67 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package agent

//go:generate codecgen -o fs_endpoint.generated.go fs_endpoint.go

import (
"bytes"
"fmt"
"io"
"math"
"net"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -204,6 +205,10 @@ func (s *HTTPServer) FileCatRequest(resp http.ResponseWriter, req *http.Request)
return nil, r.Close()
}

var (
HeartbeatStreamFrame = &StreamFrame{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment

)

// StreamFrame is used to frame data of a file when streaming
type StreamFrame struct {
// Offset is the offset the data was read from
Expand All @@ -225,6 +230,27 @@ func (s *StreamFrame) IsHeartbeat() bool {
return s.Offset == 0 && len(s.Data) == 0 && s.File == "" && s.FileEvent == ""
}

func (s *StreamFrame) Clear() {
s.Offset = 0
s.Data = nil
s.File = ""
s.FileEvent = ""
}

func (s *StreamFrame) IsCleared() bool {
if s.Offset != 0 {
return false
} else if s.Data != nil {
return false
} else if s.File != "" {
return false
} else if s.FileEvent != "" {
return false
} else {
return true
}
}

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
Expand All @@ -243,7 +269,7 @@ type StreamFramer struct {
l sync.Mutex

// The current working frame
f *StreamFrame
f StreamFrame
data *bytes.Buffer

// Captures whether the framer is running and any error that occurred to
Expand Down Expand Up @@ -328,32 +354,32 @@ OUTER:
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
if s.f.IsCleared() {
s.l.Unlock()
continue
}

// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
err = s.send(&s.f)
s.f.Clear()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(&StreamFrame{}); err != nil {
if err = s.send(HeartbeatStreamFrame); err != nil {
return
}
}
}

s.l.Lock()
if s.f != nil {
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
err = s.send(&s.f)
s.f.Clear()
}
s.l.Unlock()
}
Expand All @@ -378,9 +404,7 @@ func (s *StreamFramer) readData() []byte {
return nil
}
d := s.data.Next(size)
b := make([]byte, size)
copy(b, d)
return b
return d
}

// Send creates and sends a StreamFrame based on the passed parameters. An error
Expand All @@ -401,75 +425,62 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
}

// Check if not mergeable
if s.f != nil && (s.f.File != file || s.f.FileEvent != fileEvent) {
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
f := *s.f
f.Data = s.readData()
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&f)
s.f = nil
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
}

// Store the new data as the current frame.
if s.f == nil {
s.f = &StreamFrame{
Offset: offset,
File: file,
FileEvent: fileEvent,
}
if s.f.IsCleared() {
s.f.Offset = offset
s.f.File = file
s.f.FileEvent = fileEvent
}

// Write the data to the buffer
s.data.Write(data)

// Handle the delete case in which there is no data
force := false
if s.data.Len() == 0 && s.f.FileEvent != "" {
select {
case <-s.exitCh:
return nil
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}
if err := s.send(f); err != nil {
return err
}
force = true
}

// Flush till we are under the max frame size
for s.data.Len() >= s.frameSize {
for s.data.Len() >= s.frameSize || force {
// Clear
if force {
force = false
}

// Create a new frame to send it
d := s.readData()
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: d,
}
if err := s.send(f); err != nil {
if err := s.send(&s.f); err != nil {
return err
}

// Update the offset
s.f.Offset += int64(len(s.f.Data))
}

if s.data.Len() == 0 {
s.f = nil
s.f.Clear()
}

return nil
Expand Down Expand Up @@ -569,6 +580,26 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done()
}()

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
parseFramerErr := func(e error) error {
if e == nil {
return nil
}

if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

return err
}

// Create a variable to allow setting the last event
var lastEvent string

Expand All @@ -594,23 +625,7 @@ OUTER:
// Send the frame
if n != 0 {
if err := framer.Send(path, lastEvent, data[:n], offset); err != nil {

// Check if the connection has been closed
if err == io.ErrClosedPipe {
// The pipe check is for tests
return syscall.EPIPE
}

operr, ok := err.(*net.OpError)
if ok {
// The connection was closed by our peer
e := operr.Err.Error()
if strings.Contains(e, syscall.EPIPE.Error()) || strings.Contains(e, syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}
}

return err
return parseFramerErr(err)
}
}

Expand All @@ -637,7 +652,7 @@ OUTER:
case <-changes.Modified:
continue OUTER
case <-changes.Deleted:
return framer.Send(path, deleteEvent, nil, offset)
return parseFramerErr(framer.Send(path, deleteEvent, nil, offset))
case <-changes.Truncated:
// Close the current reader
if err := f.Close(); err != nil {
Expand All @@ -657,7 +672,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return nil
return parseFramerErr(framer.Err)
case err, ok := <-eofCancelCh:
if !ok {
return nil
Expand Down Expand Up @@ -850,7 +865,9 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
return
}

scanCh := time.Tick(nextLogCheckRate)
ticker := time.NewTicker(nextLogCheckRate)
defer ticker.Stop()
scanCh := ticker.C
for {
select {
case <-t.Dead():
Expand Down
Loading