Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

windows: Fix restarts using the raw_exec driver #5864

Merged
merged 7 commits into from
Jul 2, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ test_script:
gotestsum --junitfile results.xml
github.com/hashicorp/nomad/drivers/docker
github.com/hashicorp/nomad/client/lib/fifo
github.com/hashicorp/nomad/client/logmon
# on_finish:
# - ps: |
# Push-AppveyorArtifact (Resolve-Path .\results.xml)
Expand Down
14 changes: 8 additions & 6 deletions client/lib/fifo/fifo_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ import (
)

// CreateAndRead creates a fifo at the given path, and returns an open function for reading.
// The fifo must not exist already, or that it's already a fifo file
// For compatibility with windows, the fifo must not exist already.
//
// It returns a reader open function that may block until a writer opens
// so it's advised to run it in a goroutine different from reader goroutine
func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
// create first
if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) {
if err := mkfifo(path, 0600); err != nil {
return nil, fmt.Errorf("error creating fifo %v: %v", path, err)
}

openFn := func() (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}
return func() (io.ReadCloser, error) {
return OpenReader(path)
}, nil
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
}

return openFn, nil
func OpenReader(path string) (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}

// OpenWriter opens a fifo file for writer, assuming it already exists, returns io.WriteCloser
Expand Down
24 changes: 19 additions & 5 deletions client/lib/fifo/fifo_windows.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fifo

import (
"fmt"
"io"
"net"
"os"
Expand All @@ -22,7 +23,6 @@ type winFIFO struct {

func (f *winFIFO) Read(p []byte) (n int, err error) {
f.connLock.Lock()
defer f.connLock.Unlock()
if f.conn == nil {
c, err := f.listener.Accept()
if err != nil {
Expand All @@ -31,6 +31,7 @@ func (f *winFIFO) Read(p []byte) (n int, err error) {

endocrimes marked this conversation as resolved.
Show resolved Hide resolved
f.conn = c
}
f.connLock.Unlock()
endocrimes marked this conversation as resolved.
Show resolved Hide resolved

// If the connection is closed then we need to close the listener
// to emulate unix fifo behavior
Expand All @@ -43,7 +44,6 @@ func (f *winFIFO) Read(p []byte) (n int, err error) {

func (f *winFIFO) Write(p []byte) (n int, err error) {
f.connLock.Lock()
defer f.connLock.Unlock()
if f.conn == nil {
c, err := f.listener.Accept()
if err != nil {
Expand All @@ -52,18 +52,25 @@ func (f *winFIFO) Write(p []byte) (n int, err error) {

f.conn = c
}
f.connLock.Unlock()

// If the connection is closed then we need to close the listener
// to emulate unix fifo behavior
n, err = f.conn.Write(p)
if err == io.EOF {
f.conn.Close()
f.listener.Close()
}
return n, err

}

func (f *winFIFO) Close() error {
f.connLock.Lock()
if f.conn != nil {
f.conn.Close()
}
f.connLock.Unlock()
endocrimes marked this conversation as resolved.
Show resolved Hide resolved
return f.listener.Close()
}

Expand All @@ -75,16 +82,23 @@ func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
OutputBufferSize: PipeBufferSize,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create fifo: %v", err)
}

openFn := func() (io.ReadCloser, error) {
return func() (io.ReadCloser, error) {
return &winFIFO{
listener: l,
}, nil
}, nil
}

func OpenReader(path string) (io.ReadCloser, error) {
l, err := winio.ListenOnlyPipe(path, nil)
if err != nil {
return nil, fmt.Errorf("failed to open fifo listener: %v", err)
}

return openFn, nil
return &winFIFO{listener: l}, nil
}

// OpenWriter opens a fifo that already exists and returns an io.WriteCloser for it
Expand Down
25 changes: 18 additions & 7 deletions client/logmon/logmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logmon
import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -199,7 +200,18 @@ func (l *logRotatorWrapper) isRunning() bool {
// processOutWriter to attach to the stdout or stderr of a process.
func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
logger.Info("opening fifo", "path", path)
fifoOpenFn, err := fifo.CreateAndRead(path)

var openFn func() (io.ReadCloser, error)
var err error

if _, ferr := os.Stat(path); os.IsNotExist(ferr) {
openFn, err = fifo.CreateAndRead(path)
} else {
openFn = func() (io.ReadCloser, error) {
return fifo.OpenReader(path)
}
}

if err != nil {
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
}
Expand All @@ -211,20 +223,20 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.Fil
openCompleted: make(chan struct{}),
logger: logger,
}
wrap.start(fifoOpenFn)

wrap.start(openFn)
return wrap, nil
}

// start starts a goroutine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start(readerOpenFn func() (io.ReadCloser, error)) {
func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
go func() {
defer close(l.hasFinishedCopied)

reader, err := readerOpenFn()
reader, err := openFn()
if err != nil {
close(l.openCompleted)
l.logger.Warn("failed to open log fifo", "error", err)
l.logger.Warn("failed to open fifo", "error", err)
return
}
l.processOutReader = reader
Expand Down Expand Up @@ -284,5 +296,4 @@ func (l *logRotatorWrapper) Close() {
}

l.rotatorWriter.Close()
return
}
131 changes: 117 additions & 14 deletions client/logmon/logmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"

"github.com/hashicorp/nomad/client/lib/fifo"
Expand All @@ -16,19 +17,25 @@ import (

func TestLogmon_Start_rotate(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)
stdoutLog := "stdout"
stdoutFifoPath := filepath.Join(dir, "stdout.fifo")
stderrLog := "stderr"
stderrFifoPath := filepath.Join(dir, "stderr.fifo")

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-rotate.stdout"
stderrFifoPath = "//./pipe/test-rotate.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: stdoutLog,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: stderrLog,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
Expand Down Expand Up @@ -66,22 +73,33 @@ func TestLogmon_Start_rotate(t *testing.T) {
require.NoError(lm.Stop())
}

// asserts that calling Start twice restarts the log rotator
func TestLogmon_Start_restart(t *testing.T) {
// asserts that calling Start twice restarts the log rotator and that any logs
// published while the listener was unavailable are received.
func TestLogmon_Start_restart_flusheslogs(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("windows does not support pushing data to a pipe with no servers")
}

require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)
stdoutLog := "stdout"
stdoutFifoPath := filepath.Join(dir, "stdout.fifo")
stderrLog := "stderr"
stderrFifoPath := filepath.Join(dir, "stderr.fifo")

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: stdoutLog,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: stderrLog,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
Expand Down Expand Up @@ -162,3 +180,88 @@ func TestLogmon_Start_restart(t *testing.T) {
require.NoError(err)
})
}

// asserts that calling Start twice restarts the log rotator
func TestLogmon_Start_restart(t *testing.T) {
require := require.New(t)
var stdoutFifoPath, stderrFifoPath string

dir, err := ioutil.TempDir("", "nomadtest")
require.NoError(err)
defer os.RemoveAll(dir)

if runtime.GOOS == "windows" {
stdoutFifoPath = "//./pipe/test-restart.stdout"
stderrFifoPath = "//./pipe/test-restart.stderr"
} else {
stdoutFifoPath = filepath.Join(dir, "stdout.fifo")
stderrFifoPath = filepath.Join(dir, "stderr.fifo")
}

cfg := &LogConfig{
LogDir: dir,
StdoutLogFile: "stdout",
StdoutFifo: stdoutFifoPath,
StderrLogFile: "stderr",
StderrFifo: stderrFifoPath,
MaxFiles: 2,
MaxFileSizeMB: 1,
}

lm := NewLogMon(testlog.HCLogger(t))
impl, ok := lm.(*logmonImpl)
require.True(ok)
require.NoError(lm.Start(cfg))

stdout, err := fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err := fifo.OpenWriter(stderrFifoPath)
require.NoError(err)

// Write a string and assert it was written to the file
_, err = stdout.Write([]byte("test\n"))
require.NoError(err)

testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}
return "test\n" == string(raw), fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
require.True(impl.tl.IsRunning())

// Close stdout and assert that logmon no longer writes to the file
require.NoError(stdout.Close())
require.NoError(stderr.Close())

testutil.WaitForResult(func() (bool, error) {
return !impl.tl.IsRunning(), fmt.Errorf("logmon is still running")
}, func(err error) {
require.NoError(err)
})

// Start logmon again and assert that it can receive logs again
require.NoError(lm.Start(cfg))

stdout, err = fifo.OpenWriter(stdoutFifoPath)
require.NoError(err)
stderr, err = fifo.OpenWriter(stderrFifoPath)
require.NoError(err)

_, err = stdout.Write([]byte("test\n"))
require.NoError(err)
testutil.WaitForResult(func() (bool, error) {
raw, err := ioutil.ReadFile(filepath.Join(dir, "stdout.0"))
if err != nil {
return false, err
}

expected := "test\ntest\n" == string(raw)
return expected, fmt.Errorf("unexpected stdout %q", string(raw))
}, func(err error) {
require.NoError(err)
})
}
10 changes: 10 additions & 0 deletions vendor/github.com/Microsoft/go-winio/file.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading