Skip to content

Commit

Permalink
Merge pull request #5864 from hashicorp/dani/win-pipe-cleaner
Browse files Browse the repository at this point in the history
windows: Fix restarts using the raw_exec driver
  • Loading branch information
endocrimes committed Jul 2, 2019
2 parents bd7d60e + c712fdc commit 7968f79
Show file tree
Hide file tree
Showing 14 changed files with 954 additions and 140 deletions.
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ test_script:
gotestsum --junitfile results.xml
github.com/hashicorp/nomad/drivers/docker
github.com/hashicorp/nomad/client/lib/fifo
github.com/hashicorp/nomad/client/logmon
# on_finish:
# - ps: |
# Push-AppveyorArtifact (Resolve-Path .\results.xml)
Expand Down
14 changes: 8 additions & 6 deletions client/lib/fifo/fifo_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ import (
)

// CreateAndRead creates a fifo at the given path, and returns an open function for reading.
// The fifo must not exist already, or that it's already a fifo file
// For compatibility with windows, the fifo must not exist already.
//
// It returns a reader open function that may block until a writer opens
// so it's advised to run it in a goroutine different from reader goroutine
func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
// create first
if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) {
if err := mkfifo(path, 0600); err != nil {
return nil, fmt.Errorf("error creating fifo %v: %v", path, err)
}

openFn := func() (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}
return func() (io.ReadCloser, error) {
return OpenReader(path)
}, nil
}

return openFn, nil
func OpenReader(path string) (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}

// OpenWriter opens a fifo file for writer, assuming it already exists, returns io.WriteCloser
Expand Down
50 changes: 33 additions & 17 deletions client/lib/fifo/fifo_windows.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fifo

import (
"fmt"
"io"
"net"
"os"
Expand All @@ -20,50 +21,58 @@ type winFIFO struct {
connLock sync.Mutex
}

func (f *winFIFO) Read(p []byte) (n int, err error) {
func (f *winFIFO) ensureConn() (net.Conn, error) {
f.connLock.Lock()
defer f.connLock.Unlock()
if f.conn == nil {
c, err := f.listener.Accept()
if err != nil {
return 0, err
return nil, err
}

f.conn = c
}

return f.conn, nil
}

func (f *winFIFO) Read(p []byte) (n int, err error) {
conn, err := f.ensureConn()
if err != nil {
return 0, err
}

// If the connection is closed then we need to close the listener
// to emulate unix fifo behavior
n, err = f.conn.Read(p)
n, err = conn.Read(p)
if err == io.EOF {
f.listener.Close()
}
return n, err
}

func (f *winFIFO) Write(p []byte) (n int, err error) {
f.connLock.Lock()
defer f.connLock.Unlock()
if f.conn == nil {
c, err := f.listener.Accept()
if err != nil {
return 0, err
}

f.conn = c
conn, err := f.ensureConn()
if err != nil {
return 0, err
}

// If the connection is closed then we need to close the listener
// to emulate unix fifo behavior
n, err = f.conn.Write(p)
n, err = conn.Write(p)
if err == io.EOF {
conn.Close()
f.listener.Close()
}
return n, err

}

func (f *winFIFO) Close() error {
f.connLock.Lock()
if f.conn != nil {
f.conn.Close()
}
f.connLock.Unlock()
return f.listener.Close()
}

Expand All @@ -75,16 +84,23 @@ func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
OutputBufferSize: PipeBufferSize,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create fifo: %v", err)
}

openFn := func() (io.ReadCloser, error) {
return func() (io.ReadCloser, error) {
return &winFIFO{
listener: l,
}, nil
}, nil
}

func OpenReader(path string) (io.ReadCloser, error) {
l, err := winio.ListenOnlyPipe(path, nil)
if err != nil {
return nil, fmt.Errorf("failed to open fifo listener: %v", err)
}

return openFn, nil
return &winFIFO{listener: l}, nil
}

// OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it
Expand Down
25 changes: 18 additions & 7 deletions client/logmon/logmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logmon
import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -199,7 +200,18 @@ func (l *logRotatorWrapper) isRunning() bool {
// processOutWriter to attach to the stdout or stderr of a process.
func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
logger.Info("opening fifo", "path", path)
fifoOpenFn, err := fifo.CreateAndRead(path)

var openFn func() (io.ReadCloser, error)
var err error

if _, ferr := os.Stat(path); os.IsNotExist(ferr) {
openFn, err = fifo.CreateAndRead(path)
} else {
openFn = func() (io.ReadCloser, error) {
return fifo.OpenReader(path)
}
}

if err != nil {
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
}
Expand All @@ -211,20 +223,20 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.Fil
openCompleted: make(chan struct{}),
logger: logger,
}
wrap.start(fifoOpenFn)

wrap.start(openFn)
return wrap, nil
}

// start starts a goroutine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) {
func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
go func() {
defer close(l.hasFinishedCopied)

reader, err := readerOpenFn()
reader, err := openFn()
if err != nil {
close(l.openCompleted)
l.logger.Warn("failed to open log fifo", "error", err)
l.logger.Warn("failed to open fifo", "error", err)
return
}
l.processOutReader = reader
Expand Down Expand Up @@ -284,5 +296,4 @@ func (l *logRotatorWrapper) Close() {
}

l.rotatorWriter.Close()
return
}
131 changes: 117 additions & 14 deletions client/logmon/logmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"

"github.com/hashicorp/nomad/client/lib/fifo"
Expand All @@ -16,19 +17,25 @@ import (

func TestLogmon_Start_rotate(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)
stdoutLog := "stdout"
stdoutFifoPath := filepath.Join(dir, "stdout.fifo")
stderrLog := "stderr"
stderrFifoPath := filepath.Join(dir, "stderr.fifo")

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-rotate.stdout"
stderrFifoPath = "//./pipe/test-rotate.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: stdoutLog,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: stderrLog,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
Expand Down Expand Up @@ -66,22 +73,33 @@ func TestLogmon_Start_rotate(t *testing.T) {
require.NoError(lm.Stop())
}

// asserts that calling Start twice restarts the log rotator
func TestLogmon_Start_restart(t *testing.T) {
// asserts that calling Start twice restarts the log rotator and that any logs
// published while the listener was unavailable are received.
func TestLogmon_Start_restart_flusheslogs(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("windows does not support pushing data to a pipe with no servers")
}

require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)
stdoutLog := "stdout"
stdoutFifoPath := filepath.Join(dir, "stdout.fifo")
stderrLog := "stderr"
stderrFifoPath := filepath.Join(dir, "stderr.fifo")

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: stdoutLog,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: stderrLog,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
Expand Down Expand Up @@ -162,3 +180,88 @@ func TestLogmon_Start_restart(t *testing.T) {
require.NoError(err)
})
}

// asserts that calling Start twice restarts the log rotator
func TestLogmon_Start_restart(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
}

lm := NewLogMon(testlog.HCLogger(t))
impl, ok := lm.(*logmonImpl)
require.True(ok)
require.NoError(lm.Start(cfg))

stdout, err := fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err := fifo.OpenWriter(stderrFifoPath)
require.NoError(err)

// Write a string and assert it was written to the file
_, err = stdout.Write([]byte("test\n"))
require.NoError(err)

testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}
return "test\n" == string(raw), fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the file
require.NoError(stdout.Close())
require.NoError(stderr.Close())

testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

// Start logmon again and assert that it can receive logs again
require.NoError(lm.Start(cfg))

stdout, err = fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err = fifo.OpenWriter(stderrFifoPath)
require.NoError(err)

_, err = stdout.Write([]byte("test\n"))
require.NoError(err)
testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}

expected := "test\ntest\n" == string(raw)
return expected, fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
}
Loading

0 comments on commit 7968f79

Please sign in to comment.