Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes race on StreamFramer Destroy #2007

Merged
merged 1 commit into from
Nov 28, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 63 additions & 72 deletions command/agent/fs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,18 @@ func (s *StreamFrame) IsHeartbeat() bool {

// StreamFramer is used to buffer and send frames as well as heartbeat.
type StreamFramer struct {
out io.WriteCloser
enc *codec.Encoder
frameSize int
heartbeat *time.Ticker
flusher *time.Ticker
out io.WriteCloser
enc *codec.Encoder
encLock sync.Mutex

frameSize int

heartbeat *time.Ticker
flusher *time.Ticker

shutdownCh chan struct{}
exitCh chan struct{}

outbound chan *StreamFrame

// The mutex protects everything below
l sync.Mutex

Expand Down Expand Up @@ -266,7 +268,6 @@ func NewStreamFramer(out io.WriteCloser, heartbeatRate, batchWindow time.Duratio
frameSize: frameSize,
heartbeat: heartbeat,
flusher: flusher,
outbound: make(chan *StreamFrame),
data: bytes.NewBuffer(make([]byte, 0, 2*frameSize)),
shutdownCh: make(chan struct{}),
exitCh: make(chan struct{}),
Expand All @@ -279,10 +280,11 @@ func (s *StreamFramer) Destroy() {
close(s.shutdownCh)
s.heartbeat.Stop()
s.flusher.Stop()
running := s.running
s.l.Unlock()

// Ensure things were flushed
if s.running {
if running {
<-s.exitCh
}
s.out.Close()
Expand All @@ -309,90 +311,60 @@ func (s *StreamFramer) ExitCh() <-chan struct{} {
// run is the internal run method. It exits if Destroy is called or an error
// occurs, in which case the exit channel is closed.
func (s *StreamFramer) run() {
// Store any error and mark it as not running
var err error
defer func() {
close(s.exitCh)

s.l.Lock()
close(s.outbound)
s.Err = err
s.running = false
s.Err = err
s.l.Unlock()
}()

// Start a heartbeat/flusher go-routine. This is done seprately to avoid blocking
// the outbound channel.
go func() {
for {
select {
case <-s.exitCh:
return
case <-s.shutdownCh:
return
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
s.l.Unlock()
continue
}

// Read the data for the frame, and send it
s.f.Data = s.readData()
select {
case s.outbound <- s.f:
s.f = nil
case <-s.exitCh:
}
s.l.Unlock()
case <-s.heartbeat.C:
// Send a heartbeat frame
s.l.Lock()
select {
case s.outbound <- &StreamFrame{}:
default:
}
s.l.Unlock()
}
}
}()

OUTER:
for {
select {
case <-s.shutdownCh:
break OUTER
case o := <-s.outbound:
// Send the frame
if err = s.enc.Encode(o); err != nil {
return
case <-s.flusher.C:
// Skip if there is nothing to flush
s.l.Lock()
if s.f == nil {
s.l.Unlock()
continue
}
}
}

// Flush any existing frames
FLUSH:
for {
select {
case o := <-s.outbound:
// Send the frame and then clear the current working frame
if err = s.enc.Encode(o); err != nil {
// Read the data for the frame, and send it
s.f.Data = s.readData()
err = s.send(s.f)
s.f = nil
s.l.Unlock()
if err != nil {
return
}
case <-s.heartbeat.C:
// Send a heartbeat frame
if err = s.send(&StreamFrame{}); err != nil {
return
}
default:
break FLUSH
}
}

s.l.Lock()
if s.f != nil {
s.f.Data = s.readData()
s.enc.Encode(s.f)
err = s.send(s.f)
s.f = nil
}
s.l.Unlock()
}

// send takes a StreamFrame, encodes and sends it
func (s *StreamFramer) send(f *StreamFrame) error {
s.encLock.Lock()
defer s.encLock.Unlock()
return s.enc.Encode(f)
}

// readData is a helper which reads the buffered data returning up to the frame
// size of data. Must be called with the lock held. The returned value is
// invalid on the next read or write into the StreamFramer buffer
Expand Down Expand Up @@ -424,6 +396,7 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
if s.Err != nil {
return s.Err
}

return fmt.Errorf("StreamFramer not running")
}

Expand All @@ -435,8 +408,12 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
select {
case <-s.exitCh:
return nil
case s.outbound <- &f:
s.f = nil
default:
}
err := s.send(&f)
s.f = nil
if err != nil {
return err
}
}

Expand All @@ -457,11 +434,16 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
select {
case <-s.exitCh:
return nil
case s.outbound <- &StreamFrame{
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
}:
}
if err := s.send(f); err != nil {
return err
}
}

Expand All @@ -472,12 +454,17 @@ func (s *StreamFramer) Send(file, fileEvent string, data []byte, offset int64) e
select {
case <-s.exitCh:
return nil
case s.outbound <- &StreamFrame{
default:
}

f := &StreamFrame{
Offset: s.f.Offset,
File: s.f.File,
FileEvent: s.f.FileEvent,
Data: d,
}:
}
if err := s.send(f); err != nil {
return err
}
}

Expand Down Expand Up @@ -866,6 +853,10 @@ func blockUntilNextLog(fs allocdir.AllocDirFS, t *tomb.Tomb, logPath, task, logT
scanCh := time.Tick(nextLogCheckRate)
for {
select {
case <-t.Dead():
next <- fmt.Errorf("shutdown triggered")
close(next)
return
case err := <-eofCancelCh:
next <- err
close(next)
Expand Down