Skip to content

Commit

Permalink
Fix race in framer and improperly returned err
Browse files Browse the repository at this point in the history
Fixes #3342

Two bugs were fixed:

* Closing the StreamFramer's exitCh before setting the error means other
  goroutines blocked on exitCh closing could see the error as nil. This
  was *not* observered.
* parseFramerError on Windows would fall through and return an
  improperly captured nil err variable. There's no need for
  parseFramerError to be a closure which fixes the confusion.
  • Loading branch information
schmichael committed Dec 1, 2017
1 parent 88cb8bb commit ac3fffc
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ type StreamFramer struct {
// Captures whether the framer is running and any error that occurred to
// cause it to stop.
running bool
Err error
err error
}

// NewStreamFramer creates a new stream framer that will output StreamFrames to
Expand Down Expand Up @@ -379,16 +379,23 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}

// Err returns the error that caused the StreamFramer to exit
func (s *StreamFramer) Err() error {
s.l.Lock()
defer s.l.Unlock()
return s.err
}

// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
var err error
defer func() {
close(s.exitCh)
s.l.Lock()
s.running = false
s.Err = err
s.err = err
s.l.Unlock()
close(s.exitCh)
}()

OUTER:
Expand Down Expand Up @@ -466,8 +473,8 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.Err != nil {
return s.Err
if s.err != nil {
return s.err
}

return fmt.Errorf("StreamFramer not running")
Expand Down Expand Up @@ -608,6 +615,26 @@ func (s *HTTPServer) Stream(resp http.ResponseWriter, req *http.Request) (interf
return nil, nil
}

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
func parseFramerErr(err error) error {
if err == nil {
return nil
}

if strings.Contains(err.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(err.Error(), syscall.EPIPE.Error()) || strings.Contains(err.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

return err
}

// stream is the internal method to stream the content of a file. eofCancelCh is
// used to cancel the stream if triggered while at EOF. If the connection is
// broken an EPIPE error is returned
Expand All @@ -629,26 +656,6 @@ func (s *HTTPServer) stream(offset int64, path string,
t.Done()
}()

// parseFramerErr takes an error and returns an error. The error will
// potentially change if it was caused by the connection being closed.
parseFramerErr := func(e error) error {
if e == nil {
return nil
}

if strings.Contains(e.Error(), io.ErrClosedPipe.Error()) {
// The pipe check is for tests
return syscall.EPIPE
}

// The connection was closed by our peer
if strings.Contains(e.Error(), syscall.EPIPE.Error()) || strings.Contains(e.Error(), syscall.ECONNRESET.Error()) {
return syscall.EPIPE
}

return err
}

// Create a variable to allow setting the last event
var lastEvent string

Expand Down Expand Up @@ -721,7 +728,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err)
return parseFramerErr(framer.Err())
case err, ok := <-eofCancelCh:
if !ok {
return nil
Expand Down Expand Up @@ -916,7 +923,7 @@ func (s *HTTPServer) logs(follow, plain bool, offset int64,
return nil
}

//Since we successfully streamed, update the overall offset/idx.
// Since we successfully streamed, update the overall offset/idx.
offset = int64(0)
nextIdx = idx + 1
}
Expand Down

0 comments on commit ac3fffc

Please sign in to comment.