diff --git a/client/driver/logging/rotator.go b/client/driver/logging/rotator.go index bd3ed86a1fd9..152ab55086d2 100644 --- a/client/driver/logging/rotator.go +++ b/client/driver/logging/rotator.go @@ -2,6 +2,7 @@ package logging import ( "bufio" + "bytes" "fmt" "io/ioutil" "log" @@ -15,8 +16,20 @@ import ( ) const ( - bufSize = 32768 - flushDur = 100 * time.Millisecond + // logBufferSize is the size of the buffer. + logBufferSize = 32 * 1024 + + // bufferFlushDuration is the duration at which we flush the buffer. + bufferFlushDuration = 100 * time.Millisecond + + // lineScanLimit is the number of bytes we will attempt to scan for new + // lines when approaching the end of the file to avoid a log line being + // split between two files. Any single line that is greater than this limit + // may be split. + lineScanLimit = 16 * 1024 + + // newLineDelimiter is the delimiter used for new lines. + newLineDelimiter = '\n' ) // FileRotator writes bytes to a rotated set of files @@ -53,7 +66,7 @@ func NewFileRotator(path string, baseFile string, maxFiles int, path: path, baseFileName: baseFile, - flushTicker: time.NewTicker(flushDur), + flushTicker: time.NewTicker(bufferFlushDuration), logger: logger, purgeCh: make(chan struct{}, 1), doneCh: make(chan struct{}, 1), @@ -70,12 +83,13 @@ func NewFileRotator(path string, baseFile string, maxFiles int, // equal to the maximum size the user has defined. func (f *FileRotator) Write(p []byte) (n int, err error) { n = 0 - var nw int + var forceRotate bool for n < len(p) { // Check if we still have space in the current file, otherwise close and // open the next file - if f.currentWr >= f.FileSize { + if forceRotate || f.currentWr >= f.FileSize { + forceRotate = false f.flushBuffer() f.currentFile.Close() if err := f.nextFile(); err != nil { @@ -83,15 +97,38 @@ func (f *FileRotator) Write(p []byte) (n int, err error) { return 0, err } } - // Calculate the remaining size on this file - remainingSize := f.FileSize - f.currentWr - - // Check if the number of bytes that we have to write is less than the - // remaining size of the file - if remainingSize < int64(len(p[n:])) { - // Write the number of bytes that we can write on the current file - li := int64(n) + remainingSize - nw, err = f.writeToBuffer(p[n:li]) + // Calculate the remaining size on this file and how much we have left + // to write + remainingSpace := f.FileSize - f.currentWr + remainingToWrite := int64(len(p[n:])) + + // Check if we are near the end of the file. If we are we attempt to + // avoid a log line being split between two files. + var nw int + if (remainingSpace - lineScanLimit) < remainingToWrite { + // Scan for new line and if the data up to new line fits in current + // file, write to buffer + idx := bytes.IndexByte(p[n:], newLineDelimiter) + if idx >= 0 && (remainingSpace-int64(idx)-1) >= 0 { + // We have space so write it to buffer + nw, err = f.writeToBuffer(p[n : n+idx+1]) + } else if idx >= 0 { + // We found a new line but don't have space so just force rotate + forceRotate = true + } else if remainingToWrite > f.FileSize || f.FileSize-lineScanLimit < 0 { + // There is no new line remaining but there is no point in + // rotating since the remaining data will not even fit in the + // next file either so just fill this one up. + li := int64(n) + remainingSpace + if remainingSpace > remainingToWrite { + li = int64(n) + remainingToWrite + } + nw, err = f.writeToBuffer(p[n:li]) + } else { + // There is no new line in the data remaining for us to write + // and it will fit in the next file so rotate. + forceRotate = true + } } else { // Write all the bytes in the current file nw, err = f.writeToBuffer(p[n:]) @@ -283,7 +320,7 @@ func (f *FileRotator) createOrResetBuffer() { f.bufLock.Lock() defer f.bufLock.Unlock() if f.bufw == nil { - f.bufw = bufio.NewWriterSize(f.currentFile, bufSize) + f.bufw = bufio.NewWriterSize(f.currentFile, logBufferSize) } else { f.bufw.Reset(f.currentFile) } diff --git a/client/driver/logging/rotator_test.go b/client/driver/logging/rotator_test.go index 68b200e872e3..4fcf23952a0b 100644 --- a/client/driver/logging/rotator_test.go +++ b/client/driver/logging/rotator_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io/ioutil" "log" + "math/rand" "os" "path/filepath" "testing" @@ -168,6 +169,65 @@ func TestFileRotator_RotateFiles(t *testing.T) { }) } +func TestFileRotator_RotateFiles_Boundary(t *testing.T) { + t.Parallel() + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + t.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fr, err := NewFileRotator(path, baseFileName, 10, 5, logger) + if err != nil { + t.Fatalf("test setup err: %v", err) + } + + // We will write three times: + // 1st: Write with new lines spanning two files + // 2nd: Write long string with no new lines + // 3rd: Write a single new line + expectations := [][]byte{ + []byte("ab\n"), + []byte("cdef\n"), + []byte("12345"), + []byte("67890"), + []byte("\n"), + } + + for _, str := range []string{"ab\ncdef\n", "1234567890", "\n"} { + nw, err := fr.Write([]byte(str)) + if err != nil { + t.Fatalf("got error while writing: %v", err) + } + + if nw != len(str) { + t.Fatalf("expected %v, got %v", len(str), nw) + } + } + + var lastErr error + testutil.WaitForResult(func() (bool, error) { + + for i, exp := range expectations { + fname := filepath.Join(path, fmt.Sprintf("redis.stdout.%d", i)) + fi, err := os.Stat(fname) + if err != nil { + lastErr = err + return false, nil + } + if int(fi.Size()) != len(exp) { + lastErr = fmt.Errorf("expected size: %v, actual: %v", len(exp), fi.Size()) + return false, nil + } + } + + return true, nil + }, func(err error) { + t.Fatalf("%v", lastErr) + }) +} + func TestFileRotator_WriteRemaining(t *testing.T) { t.Parallel() var path string @@ -289,3 +349,48 @@ func TestFileRotator_PurgeOldFiles(t *testing.T) { t.Fatalf("%v", lastErr) }) } + +func BenchmarkRotator(b *testing.B) { + kb := 1024 + for _, inputSize := range []int{kb, 2 * kb, 4 * kb, 8 * kb, 16 * kb, 32 * kb, 64 * kb, 128 * kb, 256 * kb} { + b.Run(fmt.Sprintf("%dKB", inputSize/kb), func(b *testing.B) { + benchmarkRotatorWithInputSize(inputSize, b) + }) + } +} + +func benchmarkRotatorWithInputSize(size int, b *testing.B) { + var path string + var err error + if path, err = ioutil.TempDir("", pathPrefix); err != nil { + b.Fatalf("test setup err: %v", err) + } + defer os.RemoveAll(path) + + fr, err := NewFileRotator(path, baseFileName, 5, 1024*1024, logger) + if err != nil { + b.Fatalf("test setup err: %v", err) + } + b.ResetTimer() + + // run the Fib function b.N times + for n := 0; n < b.N; n++ { + // Generate some input + data := make([]byte, size) + _, err := rand.Read(data) + if err != nil { + b.Fatalf("Error generating date: %v", err) + } + + // Insert random new lines + for i := 0; i < 100; i++ { + index := rand.Intn(size) + data[index] = '\n' + } + + // Write the data + if _, err := fr.Write(data); err != nil { + b.Fatalf("Failed to write data: %v", err) + } + } +}