Skip to content

Commit

Permalink
framer: fix race and remove unused error var
Browse files Browse the repository at this point in the history
In the old code `sending` in the `send()` method shared the Data slice's
underlying backing array with its caller. Clearing StreamFrame.Data
didn't break the reference from the sent frame to the StreamFramer's
data slice.
  • Loading branch information
schmichael committed May 2, 2018
1 parent 8776cf8 commit 361db26
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 68 deletions.
10 changes: 3 additions & 7 deletions client/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
for {
if _, err := conn.Read(nil); err != nil {
if err == io.EOF || err == io.ErrClosedPipe {
// One end of the pipe was explicitly closed, exit cleanly
cancel()
return
}
Expand Down Expand Up @@ -579,12 +580,7 @@ func (f *FileSystem) logsImpl(ctx context.Context, follow, plain bool, offset in
// #3342
select {
case <-framer.ExitCh():
err := parseFramerErr(framer.Err())
if err == syscall.EPIPE {
// EPIPE just means the connection was closed
return nil
}
return err
return nil
default:
}

Expand Down Expand Up @@ -708,7 +704,7 @@ OUTER:
lastEvent = truncateEvent
continue OUTER
case <-framer.ExitCh():
return parseFramerErr(framer.Err())
return nil
case <-ctx.Done():
return nil
case err, ok := <-eofCancelCh:
Expand Down
110 changes: 49 additions & 61 deletions client/lib/streamframer/framer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func (s *StreamFrame) IsCleared() bool {
}
}

func (s *StreamFrame) Copy() *StreamFrame {
n := new(StreamFrame)
*n = *s
n.Data = make([]byte, len(s.Data))
copy(n.Data, s.Data)
return n
}

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out chan<- *StreamFrame
Expand All @@ -64,21 +72,24 @@ type StreamFramer struct {
heartbeat *time.Ticker
flusher *time.Ticker

shutdown bool
// shutdown is true when a shutdown is triggered
shutdown bool

// shutdownCh is closed when a shutdown is triggered
shutdownCh chan struct{}
exitCh chan struct{}

// exitCh is closed when the run() goroutine exits
exitCh chan struct{}

// The mutex protects everything below
l sync.Mutex

// The current working frame
f StreamFrame
f *StreamFrame
data *bytes.Buffer

// Captures whether the framer is running and any error that occurred to
// cause it to stop.
// Captures whether the framer is running
running bool
err error
}

// NewStreamFramer creates a new stream framer that will output StreamFrames to
Expand All @@ -95,6 +106,7 @@ func NewStreamFramer(out chan<- *StreamFrame,
frameSize: frameSize,
heartbeat: heartbeat,
flusher: flusher,
f: new(StreamFrame),
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
shutdownCh: make(chan struct{}),
exitCh: make(chan struct{}),
Expand All @@ -110,6 +122,7 @@ func (s *StreamFramer) Destroy() {

if !wasShutdown {
close(s.shutdownCh)
close(s.out)
}

s.heartbeat.Stop()
Expand All @@ -121,9 +134,6 @@ func (s *StreamFramer) Destroy() {
if running {
<-s.exitCh
}
if !wasShutdown {
close(s.out)
}
}

// Run starts a long lived goroutine that handles sending data as well as
Expand All @@ -144,24 +154,20 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
return s.exitCh
}

// Err returns the error that caused the StreamFramer to exit
func (s *StreamFramer) Err() error {
s.l.Lock()
defer s.l.Unlock()
return s.err
}

// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
var err error
defer func() {
s.l.Lock()
s.running = false
s.err = err
s.l.Unlock()
close(s.exitCh)
}()
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered in f", r)
}
}()

OUTER:
for {
Expand All @@ -177,40 +183,38 @@ OUTER:
}

// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
s.send()
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(HeartbeatStreamFrame); err != nil {
return
select {
case s.out <- HeartbeatStreamFrame:
case <-s.shutdownCh:
}
}
}

s.l.Lock()
if !s.f.IsCleared() {
s.f.Data = s.readData()
err = s.send(&s.f)
s.f.Clear()
s.send()
}
s.l.Unlock()
}

// send takes a StreamFrame, encodes and sends it
func (s *StreamFramer) send(f *StreamFrame) error {
sending := *f
f.Data = nil
func (s *StreamFramer) send() {
// Ensure s.out has not already been closd by Destroy
select {
case <-s.shutdownCh:
return
default:
}

s.f.Data = s.readData()
select {
case s.out <- &sending:
return nil
case <-s.exitCh:
return nil
case s.out <- s.f.Copy():
s.f.Clear()
case <-s.shutdownCh:
}
}

Expand All @@ -236,31 +240,16 @@ func (s *StreamFramer) readData() []byte {
func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) error {
s.l.Lock()
defer s.l.Unlock()

// If we are not running, return the error that caused us to not run or
// indicated that it was never started.
if !s.running {
if s.err != nil {
return s.err
}

return fmt.Errorf("StreamFramer not running")
}

// Check if not mergeable
if !s.f.IsCleared() && (s.f.File != file || s.f.FileEvent != fileEvent) {
// Flush the old frame
s.f.Data = s.readData()
select {
case <-s.exitCh:
return nil
default:
}
err := s.send(&s.f)
s.f.Clear()
if err != nil {
return err
}
s.send()
}

// Store the new data as the current frame.
Expand All @@ -285,25 +274,24 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
force = false
}

// Create a new frame to send it
s.f.Data = s.readData()
// Ensure s.out has not already been closd by Destroy
select {
case <-s.exitCh:
case <-s.shutdownCh:
return nil
default:
}

if err := s.send(&s.f); err != nil {
return err
// Create a new frame to send it
s.f.Data = s.readData()
select {
case s.out <- s.f.Copy():
case <-s.shutdownCh:
return nil
}

// Update the offset
s.f.Offset += int64(len(s.f.Data))
}

if s.data.Len() == 0 {
s.f.Clear()
}

return nil
}

0 comments on commit 361db26

Please sign in to comment.