Skip to content

Commit

Permalink
Decouple logfile from tailfile.
Browse files Browse the repository at this point in the history
This makes it so consumers of `LogFile` should pass in how to get an
io.Reader to the requested number of lines to tail.

This is also much more efficient when tailing a large number of lines.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
  • Loading branch information
cpuguy83 committed Aug 11, 2018
1 parent 874867d commit 94a1015
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 50 deletions.
2 changes: 1 addition & 1 deletion daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func New(info logger.Info) (logger.Logger, error) {
return b, nil
}

writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640)
writer, err := loggerutils.NewLogFile(info.LogPath, capval, maxFiles, compress, marshalFunc, decodeFunc, 0640, getTailReader)
if err != nil {
return nil, err
}
Expand Down
47 changes: 32 additions & 15 deletions daemon/logger/jsonfilelog/jsonfilelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -107,7 +108,10 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
ContainerID: "a7317399f3f857173c6179d44823594f8294678dea9999662e5c625b5a1c7657",
LogPath: tmp.Join("container.log"),
Config: map[string]string{
"labels": "first,second",
"labels": "first,second",
"max-file": "10",
"compress": "true",
"max-size": "20m",
},
ContainerLabels: map[string]string{
"first": "label_value",
Expand All @@ -117,21 +121,34 @@ func BenchmarkJSONFileLoggerLog(b *testing.B) {
assert.NilError(b, err)
defer jsonlogger.Close()

msg := &logger.Message{
Line: []byte("Line that thinks that it is log line from docker\n"),
Source: "stderr",
Timestamp: time.Now().UTC(),
}

buf := bytes.NewBuffer(nil)
assert.NilError(b, marshalMessage(msg, nil, buf))
b.SetBytes(int64(buf.Len()))
t := time.Now().UTC()
for _, data := range [][]byte{
[]byte(""),
[]byte("a short string"),
bytes.Repeat([]byte("a long string"), 100),
bytes.Repeat([]byte("a really long string"), 10000),
} {
b.Run(fmt.Sprintf("%d", len(data)), func(b *testing.B) {
testMsg := &logger.Message{
Line: data,
Source: "stderr",
Timestamp: t,
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := jsonlogger.Log(msg); err != nil {
b.Fatal(err)
}
buf := bytes.NewBuffer(nil)
assert.NilError(b, marshalMessage(testMsg, nil, buf))
b.SetBytes(int64(buf.Len()))
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg := logger.NewMessage()
msg.Line = testMsg.Line
msg.Timestamp = testMsg.Timestamp
msg.Source = testMsg.Source
if err := jsonlogger.Log(msg); err != nil {
b.Fatal(err)
}
}
})
}
}

Expand Down
14 changes: 11 additions & 3 deletions daemon/logger/jsonfilelog/read.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelog"

import (
"context"
"encoding/json"
"io"

"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/jsonfilelog/jsonlog"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/tailfile"
"github.com/sirupsen/logrus"
)

const maxJSONDecodeRetry = 20000
Expand Down Expand Up @@ -63,14 +67,14 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
return func() (msg *logger.Message, err error) {
for retries := 0; retries < maxJSONDecodeRetry; retries++ {
msg, err = decodeLogLine(dec, l)
if err == nil {
if err == nil || err == io.EOF {
break
}

logrus.WithError(err).WithField("retries", retries).Warn("got error while decoding json")
// try again, could be due to a an incomplete json object as we read
if _, ok := err.(*json.SyntaxError); ok {
dec = json.NewDecoder(rdr)
retries++
continue
}

Expand All @@ -81,9 +85,13 @@ func decodeFunc(rdr io.Reader) func() (*logger.Message, error) {
if err == io.ErrUnexpectedEOF {
reader := io.MultiReader(dec.Buffered(), rdr)
dec = json.NewDecoder(reader)
retries++
continue
}
}
return msg, err
}
}

func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
return tailfile.NewTailReader(ctx, r, req)
}
30 changes: 30 additions & 0 deletions daemon/logger/jsonfilelog/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo

import (
"bytes"
"io"
"testing"
"time"

Expand Down Expand Up @@ -62,3 +63,32 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
}
}
}

func TestEncodeDecode(t *testing.T) {
t.Parallel()

m1 := &logger.Message{Line: []byte("hello 1"), Timestamp: time.Now(), Source: "stdout"}
m2 := &logger.Message{Line: []byte("hello 2"), Timestamp: time.Now(), Source: "stdout"}
m3 := &logger.Message{Line: []byte("hello 3"), Timestamp: time.Now(), Source: "stdout"}

buf := bytes.NewBuffer(nil)
assert.Assert(t, marshalMessage(m1, nil, buf))
assert.Assert(t, marshalMessage(m2, nil, buf))
assert.Assert(t, marshalMessage(m3, nil, buf))

decode := decodeFunc(buf)
msg, err := decode()
assert.Assert(t, err)
assert.Assert(t, string(msg.Line) == "hello 1\n", string(msg.Line))

msg, err = decode()
assert.Assert(t, err)
assert.Assert(t, string(msg.Line) == "hello 2\n")

msg, err = decode()
assert.Assert(t, err)
assert.Assert(t, string(msg.Line) == "hello 3\n")

_, err = decode()
assert.Assert(t, err == io.EOF)
}
117 changes: 86 additions & 31 deletions daemon/logger/loggerutils/logfile.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package loggerutils // import "github.com/docker/docker/daemon/logger/loggerutils"

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
Expand All @@ -14,11 +13,9 @@ import (
"time"

"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils/multireader"
"github.com/docker/docker/pkg/filenotify"
"github.com/docker/docker/pkg/pools"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/tailfile"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -92,13 +89,27 @@ type LogFile struct {
notifyRotate *pubsub.Publisher
marshal logger.MarshalFunc
createDecoder makeDecoderFunc
getTailReader GetTailReaderFunc
perms os.FileMode
}

type makeDecoderFunc func(rdr io.Reader) func() (*logger.Message, error)

// SizeReaderAt defines a ReaderAt that also reports its size.
// This is used for tailing log files.
type SizeReaderAt interface {
io.ReaderAt
Size() int64
}

// GetTailReaderFunc is used to truncate a reader to only read as much as is required
// in order to get the passed in number of log lines.
// It returns the sectioned reader, the number of lines that the section reader
// contains, and any error that occurs.
type GetTailReaderFunc func(ctx context.Context, f SizeReaderAt, nLogLines int) (rdr io.Reader, nLines int, err error)

// NewLogFile creates new LogFile
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode) (*LogFile, error) {
func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, marshaller logger.MarshalFunc, decodeFunc makeDecoderFunc, perms os.FileMode, getTailReader GetTailReaderFunc) (*LogFile, error) {
log, err := os.OpenFile(logPath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, perms)
if err != nil {
return nil, err
Expand All @@ -120,6 +131,7 @@ func NewLogFile(logPath string, capacity int64, maxFiles int, compress bool, mar
marshal: marshaller,
createDecoder: decodeFunc,
perms: perms,
getTailReader: getTailReader,
}, nil
}

Expand Down Expand Up @@ -309,33 +321,45 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
}

if config.Tail != 0 {
// TODO(@cpuguy83): Instead of opening every file, only get the files which
// are needed to tail.
// This is especially costly when compression is enabled.
files, err := w.openRotatedFiles(config)
w.mu.RUnlock()
if err != nil {
w.mu.RUnlock()
watcher.Err <- err
return
}
w.mu.RUnlock()
seekers := make([]io.ReadSeeker, 0, len(files)+1)
for _, f := range files {
seekers = append(seekers, f)
}
if currentChunk.Size() > 0 {
seekers = append(seekers, currentChunk)
}
if len(seekers) > 0 {
tailFile(multireader.MultiReadSeeker(seekers...), watcher, w.createDecoder, config)

closeFiles := func() {
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference the log file %q: %v", fileName, err)
}
}
}
}

readers := make([]SizeReaderAt, 0, len(files)+1)
for _, f := range files {
f.Close()
fileName := f.Name()
if strings.HasSuffix(fileName, tmpLogfileSuffix) {
err := w.filesRefCounter.Dereference(fileName)
if err != nil {
logrus.Errorf("Failed to dereference log file %q: %v", fileName, err)
}
stat, err := f.Stat()
if err != nil {
watcher.Err <- errors.Wrap(err, "error reading size of rotated file")
closeFiles()
return
}
readers = append(readers, io.NewSectionReader(f, 0, stat.Size()))
}
if currentChunk.Size() > 0 {
readers = append(readers, currentChunk)
}

tailFiles(readers, watcher, w.createDecoder, w.getTailReader, config)
closeFiles()

w.mu.RLock()
}
Expand Down Expand Up @@ -454,19 +478,39 @@ func newSectionReader(f *os.File) (*io.SectionReader, error) {
return io.NewSectionReader(f, 0, size), nil
}

type decodeFunc func() (*logger.Message, error)
func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, getTailReader GetTailReaderFunc, config logger.ReadConfig) {
nLines := config.Tail

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO(@cpuguy83): we should plumb a context through instead of dealing with `WatchClose()` here.
go func() {
select {
case <-ctx.Done():
case <-watcher.WatchClose():
cancel()
}
}()

readers := make([]io.Reader, 0, len(files))

func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDecoderFunc, config logger.ReadConfig) {
var rdr io.Reader = f
if config.Tail > 0 {
ls, err := tailfile.TailFile(f, config.Tail)
if err != nil {
watcher.Err <- err
return
for i := len(files) - 1; i >= 0 && nLines > 0; i-- {
tail, n, err := getTailReader(ctx, files[i], nLines)
if err != nil {
watcher.Err <- errors.Wrap(err, "error finding file position to start log tailing")
return
}
nLines -= n
readers = append([]io.Reader{tail}, readers...)
}
} else {
for _, r := range files {
readers = append(readers, &wrappedReaderAt{ReaderAt: r})
}
rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
}

rdr := io.MultiReader(readers...)
decodeLogLine := createDecoder(rdr)
for {
msg, err := decodeLogLine()
Expand All @@ -483,7 +527,7 @@ func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDec
return
}
select {
case <-watcher.WatchClose():
case <-ctx.Done():
return
case watcher.Msg <- msg:
}
Expand Down Expand Up @@ -664,3 +708,14 @@ func watchFile(name string) (filenotify.FileWatcher, error) {
}
return fileWatcher, nil
}

type wrappedReaderAt struct {
io.ReaderAt
pos int64
}

func (r *wrappedReaderAt) Read(p []byte) (int, error) {
n, err := r.ReaderAt.ReadAt(p, r.pos)
r.pos += int64(n)
return n, err
}
Loading

0 comments on commit 94a1015

Please sign in to comment.