Skip to content

Commit

Permalink
logmon: Avoid blocking main routine when opening fifo reader
Browse files Browse the repository at this point in the history
  • Loading branch information
endocrimes committed Jun 21, 2019
1 parent f26012c commit f3652ee
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 21 deletions.
13 changes: 10 additions & 3 deletions client/lib/fifo/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
func TestFIFO(t *testing.T) {
require := require.New(t)
var path string
var reader io.ReadCloser

if runtime.GOOS == "windows" {
path = "//./pipe/fifo"
Expand All @@ -31,9 +32,8 @@ func TestFIFO(t *testing.T) {
path = filepath.Join(dir, "fifo")
}

reader, err := CreateAndRead(path)
openFn, err := CreateAndRead(path)
require.NoError(err)

toWrite := [][]byte{
[]byte("abc\n"),
[]byte(""),
Expand All @@ -53,6 +53,9 @@ func TestFIFO(t *testing.T) {
return
}

reader, err = openFn()
require.NoError(err)

_, err = io.Copy(&readBuf, reader)
assert.NoError(t, err)
}()
Expand Down Expand Up @@ -80,6 +83,7 @@ func TestFIFO(t *testing.T) {
func TestWriteClose(t *testing.T) {
require := require.New(t)
var path string
var reader io.ReadCloser

if runtime.GOOS == "windows" {
path = "//./pipe/" + uuid.Generate()[:4]
Expand All @@ -91,7 +95,7 @@ func TestWriteClose(t *testing.T) {
path = filepath.Join(dir, "fifo")
}

reader, err := CreateAndRead(path)
openFn, err := CreateAndRead(path)
require.NoError(err)

var readBuf bytes.Buffer
Expand All @@ -105,6 +109,9 @@ func TestWriteClose(t *testing.T) {
return
}

reader, err = openFn()
require.NoError(err)

_, err = io.Copy(&readBuf, reader)
assert.NoError(t, err)
}()
Expand Down
6 changes: 4 additions & 2 deletions client/lib/fifo/fifo_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
//
// It returns a reader open function that may block until a writer opens
// so it's advised to run it in a goroutine different from reader goroutine
func CreateAndRead(path string) (io.ReadCloser, error) {
func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
// create first
if err := mkfifo(path, 0600); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("error creating fifo %v: %v", path, err)
}

return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
return func() (io.ReadCloser, error) {
return os.OpenFile(path, unix.O_RDONLY, os.ModeNamedPipe)
}, nil
}

func OpenReader(path string) (io.ReadCloser, error) {
Expand Down
8 changes: 5 additions & 3 deletions client/lib/fifo/fifo_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *winFIFO) Close() error {

// CreateAndRead creates a fifo at the given path and returns an io.ReadCloser open for it.
// The fifo must not already exist
func CreateAndRead(path string) (io.ReadCloser, error) {
func CreateAndRead(path string) (func() (io.ReadCloser, error), error) {
l, err := winio.ListenPipe(path, &winio.PipeConfig{
InputBufferSize: PipeBufferSize,
OutputBufferSize: PipeBufferSize,
Expand All @@ -78,8 +78,10 @@ func CreateAndRead(path string) (io.ReadCloser, error) {
return nil, err
}

return &winFIFO{
listener: l,
return func() (io.ReadCloser, error) {
return &winFIFO{
listener: l,
}, nil
}, nil
}

Expand Down
38 changes: 25 additions & 13 deletions client/logmon/logmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logmon
import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -200,14 +201,19 @@ func (l *logRotatorWrapper) isRunning() bool {
func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.FileRotator) (*logRotatorWrapper, error) {
logger.Info("opening fifo", "path", path)

var reader io.ReadCloser
var openErr, createErr error
reader, openErr = fifo.OpenReader(path)
if openErr != nil {
reader, createErr = fifo.CreateAndRead(path)
var openFn func() (io.ReadCloser, error)
var err error

if _, ferr := os.Stat(path); os.IsNotExist(ferr) {
openFn, err = fifo.CreateAndRead(path)
} else {
openFn = func() (io.ReadCloser, error) {
return fifo.OpenReader(path)
}
}
if openErr != nil && createErr != nil {
return nil, fmt.Errorf("failed to create fifo for extracting logs:\n- %v\n- %v", createErr, openErr)

if err != nil {
return nil, fmt.Errorf("failed to create fifo for extracting logs: %v", err)
}

wrap := &logRotatorWrapper{
Expand All @@ -218,19 +224,25 @@ func newLogRotatorWrapper(path string, logger hclog.Logger, rotator *logging.Fil
logger: logger,
}

wrap.processOutReader = reader
close(wrap.openCompleted)

wrap.start(reader)
wrap.start(openFn)
return wrap, nil
}

// start starts a goroutine that copies from the pipe into the rotator. This is
// called by the constructor and not the user of the wrapper.
func (l *logRotatorWrapper) start(reader io.ReadCloser) {
func (l *logRotatorWrapper) start(openFn func() (io.ReadCloser, error)) {
go func() {
defer close(l.hasFinishedCopied)
_, err := io.Copy(l.rotatorWriter, reader)

reader, err := openFn()
close(l.openCompleted)
if err != nil {
l.logger.Warn("failed to open fifo", "error", err)
return
}
l.processOutReader = reader

_, err = io.Copy(l.rotatorWriter, reader)
if err != nil {
l.logger.Warn("failed to read from log fifo", "error", err)
// Close reader to propagate io error across pipe.
Expand Down

0 comments on commit f3652ee

Please sign in to comment.