Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid splitting log line across two files #4282

Merged
merged 4 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions client/driver/logging/rotator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logging

import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"log"
Expand All @@ -15,8 +16,20 @@ import (
)

const (
bufSize = 32768
flushDur = 100 * time.Millisecond
// logBufferSize is the size of the buffer.
logBufferSize = 32 * 1024

// bufferFlushDuration is the duration at which we flush the buffer.
bufferFlushDuration = 100 * time.Millisecond

// lineScanLimit is the number of bytes we will attempt to scan for new
// lines when approaching the end of the file to avoid a log line being
// split between two files. Any single line that is greater than this limit
// may be split.
lineScanLimit = 16 * 1024

// newLineDelimiter is the delimiter used for new lines.
newLineDelimiter = '\n'
)

// FileRotator writes bytes to a rotated set of files
Expand Down Expand Up @@ -53,7 +66,7 @@ func NewFileRotator(path string, baseFile string, maxFiles int,
path: path,
baseFileName: baseFile,

flushTicker: time.NewTicker(flushDur),
flushTicker: time.NewTicker(bufferFlushDuration),
logger: logger,
purgeCh: make(chan struct{}, 1),
doneCh: make(chan struct{}, 1),
Expand All @@ -70,28 +83,52 @@ func NewFileRotator(path string, baseFile string, maxFiles int,
// equal to the maximum size the user has defined.
func (f *FileRotator) Write(p []byte) (n int, err error) {
n = 0
var nw int
var forceRotate bool

for n < len(p) {
// Check if we still have space in the current file, otherwise close and
// open the next file
if f.currentWr >= f.FileSize {
if forceRotate || f.currentWr >= f.FileSize {
forceRotate = false
f.flushBuffer()
f.currentFile.Close()
if err := f.nextFile(); err != nil {
f.logger.Printf("[ERROR] driver.rotator: error creating next file: %v", err)
return 0, err
}
}
// Calculate the remaining size on this file
remainingSize := f.FileSize - f.currentWr

// Check if the number of bytes that we have to write is less than the
// remaining size of the file
if remainingSize < int64(len(p[n:])) {
// Write the number of bytes that we can write on the current file
li := int64(n) + remainingSize
nw, err = f.writeToBuffer(p[n:li])
// Calculate the remaining size on this file and how much we have left
// to write
remainingSpace := f.FileSize - f.currentWr
remainingToWrite := int64(len(p[n:]))

// Check if we are near the end of the file. If we are we attempt to
// avoid a log line being split between two files.
var nw int
if (remainingSpace - lineScanLimit) < remainingToWrite {
// Scan for new line and if the data up to new line fits in current
// file, write to buffer
idx := bytes.IndexByte(p[n:], newLineDelimiter)
if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 {
Copy link
Contributor

@preetapan preetapan May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this conditional work right if the first char in p is a new line, i.e idx=0?
IIUC it does work, It would write just the new line char to the buffer and then get back to the for loop.

Might want to make a unit test for such edge cases though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is what the minus 1 is for. Since a write of p[n:n] actually writes 1 byte so I have to check if there is room for whatever index + 1

// We have space so write it to buffer
nw, err = f.writeToBuffer(p[n : n+idx+1])
} else if idx >= 0 {
// We found a new line but don't have space so just force rotate
forceRotate = true
} else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 {
// There is no new line remaining but there is no point in
// rotating since the remaining data will not even fit in the
// next file either so just fill this one up.
li := int64(n) + remainingSpace
if remainingSpace > remainingToWrite {
li = int64(n) + remainingToWrite
}
nw, err = f.writeToBuffer(p[n:li])
} else {
// There is no new line in the data remaining for us to write
// and it will fit in the next file so rotate.
forceRotate = true
}
} else {
// Write all the bytes in the current file
nw, err = f.writeToBuffer(p[n:])
Expand Down Expand Up @@ -283,7 +320,7 @@ func (f *FileRotator) createOrResetBuffer() {
f.bufLock.Lock()
defer f.bufLock.Unlock()
if f.bufw == nil {
f.bufw = bufio.NewWriterSize(f.currentFile, bufSize)
f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize)
} else {
f.bufw.Reset(f.currentFile)
}
Expand Down
105 changes: 105 additions & 0 deletions client/driver/logging/rotator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -168,6 +169,65 @@ func TestFileRotator_RotateFiles(t *testing.T) {
})
}

func TestFileRotator_RotateFiles_Boundary(t *testing.T) {
t.Parallel()
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
t.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)

fr, err := NewFileRotator(path, baseFileName, 10, 5, logger)
if err != nil {
t.Fatalf("test setup err: %v", err)
}

// We will write three times:
// 1st: Write with new lines spanning two files
// 2nd: Write long string with no new lines
// 3rd: Write a single new line
expectations := [][]byte{
[]byte("ab\n"),
[]byte("cdef\n"),
[]byte("12345"),
[]byte("67890"),
[]byte("\n"),
}

for _, str := range []string{"ab\ncdef\n", "1234567890", "\n"} {
nw, err := fr.Write([]byte(str))
if err != nil {
t.Fatalf("got error while writing: %v", err)
}

if nw != len(str) {
t.Fatalf("expected %v, got %v", len(str), nw)
}
}

var lastErr error
testutil.WaitForResult(func() (bool, error) {

for i, exp := range expectations {
fname := filepath.Join(path, fmt.Sprintf("redis.stdout.%d", i))
fi, err := os.Stat(fname)
if err != nil {
lastErr = err
return false, nil
}
if int(fi.Size()) != len(exp) {
lastErr = fmt.Errorf("expected size: %v, actual: %v", len(exp), fi.Size())
return false, nil
}
}

return true, nil
}, func(err error) {
t.Fatalf("%v", lastErr)
})
}

func TestFileRotator_WriteRemaining(t *testing.T) {
t.Parallel()
var path string
Expand Down Expand Up @@ -289,3 +349,48 @@ func TestFileRotator_PurgeOldFiles(t *testing.T) {
t.Fatalf("%v", lastErr)
})
}

func BenchmarkRotator(b *testing.B) {
kb := 1024
for _, inputSize := range []int{kb, 2 * kb, 4 * kb, 8 * kb, 16 * kb, 32 * kb, 64 * kb, 128 * kb, 256 * kb} {
b.Run(fmt.Sprintf("%dKB", inputSize/kb), func(b *testing.B) {
benchmarkRotatorWithInputSize(inputSize, b)
})
}
}

func benchmarkRotatorWithInputSize(size int, b *testing.B) {
var path string
var err error
if path, err = ioutil.TempDir("", pathPrefix); err != nil {
b.Fatalf("test setup err: %v", err)
}
defer os.RemoveAll(path)

fr, err := NewFileRotator(path, baseFileName, 5, 1024*1024, logger)
if err != nil {
b.Fatalf("test setup err: %v", err)
}
b.ResetTimer()

// run the Fib function b.N times
for n := 0; n < b.N; n++ {
// Generate some input
data := make([]byte, size)
_, err := rand.Read(data)
if err != nil {
b.Fatalf("Error generating date: %v", err)
}

// Insert random new lines
for i := 0; i < 100; i++ {
index := rand.Intn(size)
data[index] = '\n'
}

// Write the data
if _, err := fr.Write(data); err != nil {
b.Fatalf("Failed to write data: %v", err)
}
}
}