Skip to content

Commit

Permalink
Merge pull request #5248 from hashicorp/b-rawexec-leak
Browse files Browse the repository at this point in the history
Fix leaked executor in raw_exec
  • Loading branch information
nickethier committed Jan 29, 2019
2 parents 1d94e8a + d6ada19 commit 14c5a1c
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 4 deletions.
9 changes: 6 additions & 3 deletions client/pluginmanager/drivermanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,12 @@ func (m *manager) storePluginReattachConfig(id loader.PluginID, c *plugin.Reatta
m.reattachConfigLock.Lock()
defer m.reattachConfigLock.Unlock()

// Store the new reattach config
m.reattachConfigs[id] = pstructs.ReattachConfigFromGoPlugin(c)

if c == nil {
delete(m.reattachConfigs, id)
} else {
// Store the new reattach config
m.reattachConfigs[id] = pstructs.ReattachConfigFromGoPlugin(c)
}
// Persist the state
s := &state.PluginState{
ReattachConfigs: make(map[string]*pstructs.ReattachConfig, len(m.reattachConfigs)),
Expand Down
15 changes: 14 additions & 1 deletion command/executor_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ import (
plugin "github.com/hashicorp/go-plugin"

"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/lib/circbufwriter"
"github.com/hashicorp/nomad/plugins/base"
)

const (
// circleBufferSize is the size of the in memory ring buffer used for
// go-plugin logging to stderr. When the buffer exceeds this size before
// flushing it will begin overwriting data
circleBufferSize = 64 * 1024
)

type ExecutorPluginCommand struct {
Meta
}
Expand Down Expand Up @@ -47,9 +55,14 @@ func (e *ExecutorPluginCommand) Run(args []string) int {
return 1
}

// If the client detatches from go-plugin it will block on logging to stderr.
// This buffered writer will never block on write, and instead buffer the
// writes to a ring buffer.
bufferedStderrW := circbufwriter.New(os.Stderr, circleBufferSize)

// Tee the logs to stderr and the file so that they are streamed to the
// client
out := io.MultiWriter(f, os.Stderr)
out := io.MultiWriter(f, bufferedStderrW)

// Create the logger
logger := log.New(&log.LoggerOptions{
Expand Down
9 changes: 9 additions & 0 deletions drivers/rawexec/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
logger: d.logger,
doneCh: make(chan struct{}),
}

d.tasks.Set(taskState.TaskConfig.ID, h)
Expand Down Expand Up @@ -356,6 +358,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
doneCh: make(chan struct{}),
}

driverState := TaskState{
Expand Down Expand Up @@ -426,6 +429,12 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e
return fmt.Errorf("executor Shutdown failed: %v", err)
}

// Wait for handle to finish
<-handle.doneCh

// Kill executor
handle.pluginClient.Kill()

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions drivers/rawexec/driver_pre09.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
procState: drivers.TaskStateRunning,
startedAt: time.Now(),
exitResult: &drivers.ExitResult{},
logger: d.logger,
doneCh: make(chan struct{}),
}

d.tasks.Set(h.Config.ID, th)
Expand Down
2 changes: 2 additions & 0 deletions drivers/rawexec/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type taskHandle struct {
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
doneCh chan struct{}
}

func (h *taskHandle) TaskStatus() *drivers.TaskStatus {
Expand All @@ -52,6 +53,7 @@ func (h *taskHandle) IsRunning() bool {
}

func (h *taskHandle) run() {
defer close(h.doneCh)
h.stateLock.Lock()
if h.exitResult == nil {
h.exitResult = &drivers.ExitResult{}
Expand Down
116 changes: 116 additions & 0 deletions lib/circbufwriter/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package circbufwriter

import (
"io"
"sync"
"time"

"github.com/armon/circbuf"
)

type circbufWriter struct {
// circle buffer for data to write
buf *circbuf.Buffer

// error to return from the writer
err error

// bufLock syncronizes access to err and buf
bufLock sync.Mutex

// wrapped writer
wr io.Writer

// signals to flush the buffer
flushCh chan struct{}
}

// New created a circle buffered writer that wraps the given writer. The
// bufferSize is the amount of data that will be stored in memory before
// overwriting.
func New(w io.Writer, bufferSize int64) io.WriteCloser {
buf, _ := circbuf.NewBuffer(bufferSize)
c := &circbufWriter{
buf: buf,
wr: w,
flushCh: make(chan struct{}, 1),
}
go c.flushLoop()
return c
}

// Write will write the data to the buffer and attempt to flush the buffer to
// the wrapped writer. If the wrapped writer blocks on write, subsequent write
// will be written to the circle buffer.
func (c *circbufWriter) Write(p []byte) (nn int, err error) {
// If the last write returned an error, return it here. Note there is a
// small chance of missing an error if multiple writes occure at the same
// time where the last write nils out the error before it can be returned
// here.
c.bufLock.Lock()
defer c.bufLock.Unlock()
if c.err != nil {
return nn, c.err
}

// Write to the buffer
nn, err = c.buf.Write(p)

// Signal to flush the buffer
select {
case c.flushCh <- struct{}{}:
default:
// flush is blocked
}
return nn, err
}

func (c *circbufWriter) Close() error {
// Guard against double closing channel
select {
case <-c.flushCh:
default:
close(c.flushCh)
}

// if the last write errored, it will return here
c.bufLock.Lock()
defer c.bufLock.Unlock()
return c.err
}

func (c *circbufWriter) flushLoop() {
// Check buffer every 100ms in case a flush from Write was missed
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
var err error
select {
case _, ok := <-c.flushCh:
if !ok {
// Close called, exiting loop
return
}
err = c.flush()
case <-ticker.C:
err = c.flush()
}

c.bufLock.Lock()
c.err = err
c.bufLock.Unlock()
}
}

func (c *circbufWriter) flush() error {
c.bufLock.Lock()
b := c.buf.Bytes()
c.buf.Reset()
c.bufLock.Unlock()

var err error
if len(b) > 0 {
_, err = c.wr.Write(b)
}
return err
}
71 changes: 71 additions & 0 deletions lib/circbufwriter/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package circbufwriter

import (
"bytes"
"fmt"
"io/ioutil"
"testing"

"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)

func TestWriter_NonBlockingWrite(t *testing.T) {
require := require.New(t)
var buf bytes.Buffer
w := New(&buf, 64)
n, err := w.Write([]byte("test"))
require.Equal(4, n)
require.NoError(err)

n, err = w.Write([]byte("test"))
require.Equal(4, n)
require.NoError(err)

testutil.WaitForResult(func() (bool, error) {
return "testtest" == buf.String(), fmt.Errorf("expected both writes")
}, func(err error) {
require.NoError(err)
})
}

type blockingWriter struct {
buf bytes.Buffer
unblock <-chan struct{}
}

func (b *blockingWriter) Write(p []byte) (nn int, err error) {
<-b.unblock
return b.buf.Write(p)
}

func TestWriter_BlockingWrite(t *testing.T) {
require := require.New(t)
blockCh := make(chan struct{})
bw := &blockingWriter{unblock: blockCh}
w := New(bw, 64)

n, err := w.Write([]byte("test"))
require.Equal(4, n)
require.NoError(err)
require.Empty(bw.buf.Bytes())

n, err = w.Write([]byte("test"))
require.Equal(4, n)
require.NoError(err)
require.Empty(bw.buf.Bytes())
close(blockCh)

testutil.WaitForResult(func() (bool, error) {
return "testtest" == bw.buf.String(), fmt.Errorf("expected both writes")
}, func(err error) {
require.NoError(err)
})
}

func TestWriter_CloseClose(t *testing.T) {
require := require.New(t)
w := New(ioutil.Discard, 64)
require.NoError(w.Close())
require.NoError(w.Close())
}

0 comments on commit 14c5a1c

Please sign in to comment.