From 1b34922d97043e54692d789e62cbf938e5200ee3 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 30 Apr 2018 16:39:02 -0700 Subject: [PATCH] framer: fix race and remove unused error var In the old code `sending` in the `send()` method shared the Data slice's underlying backing array with its caller. Clearing StreamFrame.Data didn't break the reference from the sent frame to the StreamFramer's data slice. --- client/fs_endpoint.go | 10 +-- client/lib/streamframer/framer.go | 110 +++++++++++++----------------- 2 files changed, 52 insertions(+), 68 deletions(-) diff --git a/client/fs_endpoint.go b/client/fs_endpoint.go index 62423ad52a1f..904c5cf49e8d 100644 --- a/client/fs_endpoint.go +++ b/client/fs_endpoint.go @@ -423,6 +423,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { for { if _, err := conn.Read(nil); err != nil { if err == io.EOF || err == io.ErrClosedPipe { + // One end of the pipe was explicitly closed, exit cleanly cancel() return } @@ -579,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in // #3342 select { case <-framer.ExitCh(): - err := parseFramerErr(framer.Err()) - if err == syscall.EPIPE { - // EPIPE just means the connection was closed - return nil - } - return err + return nil default: } @@ -708,7 +704,7 @@ OUTER: lastEvent = truncateEvent continue OUTER case <-framer.ExitCh(): - return parseFramerErr(framer.Err()) + return nil case <-ctx.Done(): return nil case err, ok := <-eofCancelCh: diff --git a/client/lib/streamframer/framer.go b/client/lib/streamframer/framer.go index b0caa4a047b4..5f2eeb626c2d 100644 --- a/client/lib/streamframer/framer.go +++ b/client/lib/streamframer/framer.go @@ -55,6 +55,14 @@ func (s *StreamFrame) IsCleared() bool { } } +func (s *StreamFrame) Copy() *StreamFrame { + n := new(StreamFrame) + *n = *s + n.Data = make([]byte, len(s.Data)) + copy(n.Data, s.Data) + return n +} + // StreamFramer is used to buffer and send frames as well as heartbeat. type StreamFramer struct { out chan<- *StreamFrame @@ -64,21 +72,24 @@ type StreamFramer struct { heartbeat *time.Ticker flusher *time.Ticker - shutdown bool + // shutdown is true when a shutdown is triggered + shutdown bool + + // shutdownCh is closed when a shutdown is triggered shutdownCh chan struct{} - exitCh chan struct{} + + // exitCh is closed when the run() goroutine exits + exitCh chan struct{} // The mutex protects everything below l sync.Mutex // The current working frame - f StreamFrame + f *StreamFrame data *bytes.Buffer - // Captures whether the framer is running and any error that occurred to - // cause it to stop. + // Captures whether the framer is running running bool - err error } // NewStreamFramer creates a new stream framer that will output StreamFrames to @@ -95,6 +106,7 @@ func NewStreamFramer(out chan<- *StreamFrame, frameSize: frameSize, heartbeat: heartbeat, flusher: flusher, + f: new(StreamFrame), data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)), shutdownCh: make(chan struct{}), exitCh: make(chan struct{}), @@ -110,6 +122,7 @@ func (s *StreamFramer) Destroy() { if !wasShutdown { close(s.shutdownCh) + close(s.out) } s.heartbeat.Stop() @@ -121,9 +134,6 @@ func (s *StreamFramer) Destroy() { if running { <-s.exitCh } - if !wasShutdown { - close(s.out) - } } // Run starts a long lived goroutine that handles sending data as well as @@ -144,24 +154,20 @@ func (s *StreamFramer) ExitCh() <-chan struct{} { return s.exitCh } -// Err returns the error that caused the StreamFramer to exit -func (s *StreamFramer) Err() error { - s.l.Lock() - defer s.l.Unlock() - return s.err -} - // run is the internal run method. It exits if Destroy is called or an error // occurs, in which case the exit channel is closed. func (s *StreamFramer) run() { - var err error defer func() { s.l.Lock() s.running = false - s.err = err s.l.Unlock() close(s.exitCh) }() + defer func() { + if r := recover(); r != nil { + fmt.Println("Recovered in f", r) + } + }() OUTER: for { @@ -177,40 +183,38 @@ OUTER: } // Read the data for the frame, and send it - s.f.Data = s.readData() - err = s.send(&s.f) - s.f.Clear() + s.send() s.l.Unlock() - if err != nil { - return - } case <-s.heartbeat.C: // Send a heartbeat frame - if err = s.send(HeartbeatStreamFrame); err != nil { - return + select { + case s.out <- HeartbeatStreamFrame: + case <-s.shutdownCh: } } } s.l.Lock() if !s.f.IsCleared() { - s.f.Data = s.readData() - err = s.send(&s.f) - s.f.Clear() + s.send() } s.l.Unlock() } // send takes a StreamFrame, encodes and sends it -func (s *StreamFramer) send(f *StreamFrame) error { - sending := *f - f.Data = nil +func (s *StreamFramer) send() { + // Ensure s.out has not already been closd by Destroy + select { + case <-s.shutdownCh: + return + default: + } + s.f.Data = s.readData() select { - case s.out <- &sending: - return nil - case <-s.exitCh: - return nil + case s.out <- s.f.Copy(): + s.f.Clear() + case <-s.shutdownCh: } } @@ -236,31 +240,16 @@ func (s *StreamFramer) readData() []byte { func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error { s.l.Lock() defer s.l.Unlock() - // If we are not running, return the error that caused us to not run or // indicated that it was never started. if !s.running { - if s.err != nil { - return s.err - } - return fmt.Errorf("StreamFramer not running") } // Check if not mergeable if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) { // Flush the old frame - s.f.Data = s.readData() - select { - case <-s.exitCh: - return nil - default: - } - err := s.send(&s.f) - s.f.Clear() - if err != nil { - return err - } + s.send() } // Store the new data as the current frame. @@ -285,25 +274,24 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e force = false } - // Create a new frame to send it - s.f.Data = s.readData() + // Ensure s.out has not already been closd by Destroy select { - case <-s.exitCh: + case <-s.shutdownCh: return nil default: } - if err := s.send(&s.f); err != nil { - return err + // Create a new frame to send it + s.f.Data = s.readData() + select { + case s.out <- s.f.Copy(): + case <-s.shutdownCh: + return nil } // Update the offset s.f.Offset += int64(len(s.f.Data)) } - if s.data.Len() == 0 { - s.f.Clear() - } - return nil }