Skip to content

Commit

Permalink
Merge pull request #4282 from hashicorp/f-rotator
Browse files Browse the repository at this point in the history
Avoid splitting log line across two files
  • Loading branch information
dadgar committed May 21, 2018
2 parents 2c89613 + 80ac8c2 commit df8f367
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 15 deletions.
67 changes: 52 additions & 15 deletions client/driver/logging/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logging

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"log"
Expand All @@ -15,8 +16,20 @@ import (
)

const (
bufSize = 32768
flushDur = 100 * time.Millisecond
// logBufferSize is the size of the buffer.
logBufferSize = 32 * 1024

// bufferFlushDuration is the duration at which we flush the buffer.
bufferFlushDuration = 100 * time.Millisecond

// lineScanLimit is the number of bytes we will attempt to scan for new
// lines when approaching the end of the file to avoid a log line being
// split between two files. Any single line that is greater than this limit
// may be split.
lineScanLimit = 16 * 1024

// newLineDelimiter is the delimiter used for new lines.
newLineDelimiter = '\n'
)

// FileRotator writes bytes to a rotated set of files
Expand Down Expand Up @@ -53,7 +66,7 @@ func NewFileRotator(path string, baseFile string, maxFiles int,
path: path,
baseFileName: baseFile,

flushTicker: time.NewTicker(flushDur),
flushTicker: time.NewTicker(bufferFlushDuration),
logger: logger,
purgeCh: make(chan struct{}, 1),
doneCh: make(chan struct{}, 1),
Expand All @@ -70,28 +83,52 @@ func NewFileRotator(path string, baseFile string, maxFiles int,
// equal to the maximum size the user has defined.
func (f *FileRotator) Write(p []byte) (n int, err error) {
n = 0
var nw int
var forceRotate bool

for n < len(p) {
// Check if we still have space in the current file, otherwise close and
// open the next file
if f.currentWr >= f.FileSize {
if forceRotate || f.currentWr >= f.FileSize {
forceRotate = false
f.flushBuffer()
f.currentFile.Close()
if err := f.nextFile(); err != nil {
f.logger.Printf("[ERROR] driver.rotator: error creating next file: %v", err)
return 0, err
}
}
// Calculate the remaining size on this file
remainingSize := f.FileSize - f.currentWr

// Check if the number of bytes that we have to write is less than the
// remaining size of the file
if remainingSize < int64(len(p[n:])) {
// Write the number of bytes that we can write on the current file
li := int64(n) + remainingSize
nw, err = f.writeToBuffer(p[n:li])
// Calculate the remaining size on this file and how much we have left
// to write
remainingSpace := f.FileSize - f.currentWr
remainingToWrite := int64(len(p[n:]))

// Check if we are near the end of the file. If we are we attempt to
// avoid a log line being split between two files.
var nw int
if (remainingSpace - lineScanLimit) < remainingToWrite {
// Scan for new line and if the data up to new line fits in current
// file, write to buffer
idx := bytes.IndexByte(p[n:], newLineDelimiter)
if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 {
// We have space so write it to buffer
nw, err = f.writeToBuffer(p[n : n+idx+1])
} else if idx >= 0 {
// We found a new line but don't have space so just force rotate
forceRotate = true
} else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 {
// There is no new line remaining but there is no point in
// rotating since the remaining data will not even fit in the
// next file either so just fill this one up.
li := int64(n) + remainingSpace
if remainingSpace > remainingToWrite {
li = int64(n) + remainingToWrite
}
nw, err = f.writeToBuffer(p[n:li])
} else {
// There is no new line in the data remaining for us to write
// and it will fit in the next file so rotate.
forceRotate = true
}
} else {
// Write all the bytes in the current file
nw, err = f.writeToBuffer(p[n:])
Expand Down Expand Up @@ -283,7 +320,7 @@ func (f *FileRotator) createOrResetBuffer() {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw == nil {
f.bufw = bufio.NewWriterSize(f.currentFile, bufSize)
f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize)
} else {
f.bufw.Reset(f.currentFile)
}
Expand Down
105 changes: 105 additions & 0 deletions client/driver/logging/rotator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -168,6 +169,65 @@ func TestFileRotator_RotateFiles(t *testing.T) {
})
}

func TestFileRotator_RotateFiles_Boundary(t *testing.T) {
t.Parallel()
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)

fr, err := NewFileRotator(path, baseFileName, 10, 5, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}

// We will write three times:
// 1st: Write with new lines spanning two files
// 2nd: Write long string with no new lines
// 3rd: Write a single new line
expectations := [][]byte{
[]byte("ab\n"),
[]byte("cdef\n"),
[]byte("12345"),
[]byte("67890"),
[]byte("\n"),
}

for _, str := range []string{"ab\ncdef\n", "1234567890", "\n"} {
nw, err := fr.Write([]byte(str))
if err != nil {
t.Fatalf("got error while writing: %v", err)
}

if nw != len(str) {
t.Fatalf("expected %v, got %v", len(str), nw)
}
}

var lastErr error
testutil.WaitForResult(func() (bool, error) {

for i, exp := range expectations {
fname := filepath.Join(path, fmt.Sprintf("redis.stdout.%d", i))
fi, err := os.Stat(fname)
if err != nil {
lastErr = err
return false, nil
}
if int(fi.Size()) != len(exp) {
lastErr = fmt.Errorf("expected size: %v, actual: %v", len(exp), fi.Size())
return false, nil
}
}

return true, nil
}, func(err error) {
t.Fatalf("%v", lastErr)
})
}

func TestFileRotator_WriteRemaining(t *testing.T) {
t.Parallel()
var path string
Expand Down Expand Up @@ -289,3 +349,48 @@ func TestFileRotator_PurgeOldFiles(t *testing.T) {
t.Fatalf("%v", lastErr)
})
}

func BenchmarkRotator(b *testing.B) {
kb := 1024
for _, inputSize := range []int{kb, 2 * kb, 4 * kb, 8 * kb, 16 * kb, 32 * kb, 64 * kb, 128 * kb, 256 * kb} {
b.Run(fmt.Sprintf("%dKB", inputSize/kb), func(b *testing.B) {
benchmarkRotatorWithInputSize(inputSize, b)
})
}
}

func benchmarkRotatorWithInputSize(size int, b *testing.B) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
b.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)

fr, err := NewFileRotator(path, baseFileName, 5, 1024*1024, logger)
if err != nil {
b.Fatalf("test setup err: %v", err)
}
b.ResetTimer()

// run the Fib function b.N times
for n := 0; n < b.N; n++ {
// Generate some input
data := make([]byte, size)
_, err := rand.Read(data)
if err != nil {
b.Fatalf("Error generating date: %v", err)
}

// Insert random new lines
for i := 0; i < 100; i++ {
index := rand.Intn(size)
data[index] = '\n'
}

// Write the data
if _, err := fr.Write(data); err != nil {
b.Fatalf("Failed to write data: %v", err)
}
}
}

0 comments on commit df8f367

Please sign in to comment.